Merge pull request #19 from wuyoukm/patch-8

1.修复红队工具和cve重复推送问题
2.待增加黑名单功能,防止有人恶意批量建仓库
This commit is contained in:
yhy 2021-09-02 10:31:34 +08:00 committed by GitHub
commit eaa826a3a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,7 +2,8 @@
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
# @Author : yhy&ddm&w4ter # @Author : yhy&ddm&w4ter
# 每3分钟检测一次github是否有新的cve漏洞提交记录若有则通过server酱和钉钉机器人推送二者配置一个即可 # 每3分钟检测一次githu
# 是否有新的cve漏洞提交记录若有则通过server酱和钉钉机器人推送二者配置一个即可
# 建议使用screen命令运行在自己的linux vps后台上就可以愉快的接收各种cve了 # 建议使用screen命令运行在自己的linux vps后台上就可以愉快的接收各种cve了
# https://my.oschina.net/u/4581868/blog/4380482 # https://my.oschina.net/u/4581868/blog/4380482
@ -18,8 +19,7 @@ from lxml import etree
import sqlite3 import sqlite3
import logging import logging
today_cve_info_tmp = []
tools_update_list = []
logging.basicConfig(level=logging.DEBUG, filename='run_info.log') logging.basicConfig(level=logging.DEBUG, filename='run_info.log')
#读取配置文件 #读取配置文件
@ -53,8 +53,8 @@ github_headers = {
#初始化创建数据库 #初始化创建数据库
def create_database(): def create_database():
conn = sqlite3.connect('data.db') conn = sqlite3.connect('data.db')
print("create_database 函数 连接数据库成功!") # print("[]create_database 函数 连接数据库成功!")
logging.info("create_database 函数 连接数据库成功!") # logging.info("create_database 函数 连接数据库成功!")
cur = conn.cursor() cur = conn.cursor()
try: try:
cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor
@ -80,8 +80,9 @@ def create_database():
server("test", "连接成功", load_config()[2]) server("test", "连接成功", load_config()[2])
elif load_config()[0] == "tgbot": elif load_config()[0] == "tgbot":
tgbot("test", "连接成功", load_config()[2], load_config()[3]) tgbot("test", "连接成功", load_config()[2], load_config()[3])
# 根据排序获取本年前20条CVE #根据排序获取本年前20条CVE
def getNews(): def getNews():
today_cve_info_tmp = []
try: try:
# 抓取本年的 # 抓取本年的
year = datetime.datetime.now().year year = datetime.datetime.now().year
@ -92,15 +93,20 @@ def getNews():
today_date = datetime.date.today() today_date = datetime.date.today()
for i in range(20): for i in range(20):
cve_url = json_str['items'][i]['html_url'] cve_url = json_str['items'][i]['html_url']
cve_name = json_str['items'][i]['name'].lower() try:
cve_name_tmp = json_str['items'][i]['name'].upper()
cve_name = re.findall('(CVE\-\d+\-\d+)', cve_name_tmp)[0].upper()
except Exception as e:
pass
pushed_at_tmp = json_str['items'][i]['pushed_at'] pushed_at_tmp = json_str['items'][i]['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
# today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at}) # today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at})
if pushed_at == str(today_date): if pushed_at == str(today_date):
today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at}) today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at})
else: else:
print("{}的更新时间为{}不属于今天的CVE".format(cve_name,pushed_at)) print("[-] {}的更新时间为{}不属于今天的CVE".format(cve_name,pushed_at))
logging.info("{}的更新时间为{}不属于今天的CVE".format(cve_name,pushed_at)) logging.info("[-] {}的更新时间为{}不属于今天的CVE".format(cve_name,pushed_at))
today_cve_info = OrderedDict() today_cve_info = OrderedDict()
for item in today_cve_info_tmp: for item in today_cve_info_tmp:
today_cve_info.setdefault(item['cve_name'], {**item, }) today_cve_info.setdefault(item['cve_name'], {**item, })
@ -122,7 +128,7 @@ def cve_insert_into_sqlite3(data):
cur = conn.cursor() cur = conn.cursor()
for i in range(len(data)): for i in range(len(data)):
try: try:
cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cve_name = re.findall('(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url'])) cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url']))
print("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name)) print("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name))
logging.info("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name)) logging.info("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name))
@ -137,13 +143,24 @@ def query_cve_info_database(cve_name):
sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name) sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name)
cursor = cur.execute(sql_grammar) cursor = cur.execute(sql_grammar)
return len(list(cursor)) return len(list(cursor))
#查询数据库里是否存在该tools工具名字的方法
def query_tools_info_database(tools_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT tools_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#获取不存在数据库里的CVE信息 #获取不存在数据库里的CVE信息
def get_today_cve_info(today_cve_info_data): def get_today_cve_info(today_cve_info_data):
today_all_cve_info = [] today_all_cve_info = []
# today_cve_info_data = getNews() # today_cve_info_data = getNews()
for i in range(len(today_cve_info_data)): for i in range(len(today_cve_info_data)):
today_cve_name = today_cve_info_data[i]['cve_name'] try:
today_cve_name = re.findall('(CVE\-\d+\-\d+)', today_cve_info_data[i]['cve_name'])[0].upper()
logging.info("get_today_cve_info 函数 today_cve_name的值为 - > {}".format(today_cve_name))
Verify = query_cve_info_database(today_cve_name.upper()) Verify = query_cve_info_database(today_cve_name.upper())
except Exception as e:
Verify = 1
if Verify == 0: if Verify == 0:
print("[+] 数据库里不存在{}".format(today_cve_name.upper())) print("[+] 数据库里不存在{}".format(today_cve_name.upper()))
logging.info("[+] 数据库里不存在{}".format(today_cve_name.upper())) logging.info("[+] 数据库里不存在{}".format(today_cve_name.upper()))
@ -159,9 +176,16 @@ def tools_insert_into_sqlite3(data):
logging.info("tools_insert_into_sqlite3 函数 打开数据库成功!") logging.info("tools_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor() cur = conn.cursor()
for i in range(len(data)): for i in range(len(data)):
cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'],data[i]['tag_name'])) Verify = query_tools_info_database(data[i]['tools_name'])
if Verify == 0:
print("[+] 红队工具表数据库里不存在{}".format(data[i]['tools_name']))
logging.info("[+] 红队工具表数据库里不存在{}".format(data[i]['tools_name']))
cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'], data[i]['tag_name']))
print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name'])))
logging.info("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) logging.info("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name'])))
else:
print("[-] 红队工具表数据库里存在{}".format(data[i]['tools_name']))
logging.info("[-] 红队工具表数据库里存在{}".format(data[i]['tools_name']))
conn.commit() conn.commit()
conn.close() conn.close()
#读取本地红队工具链接文件转换成list #读取本地红队工具链接文件转换成list
@ -175,11 +199,11 @@ def get_pushed_at_time(tools_list):
for url in tools_list: for url in tools_list:
tools_json = requests.get(url, headers=github_headers, timeout=10).json() tools_json = requests.get(url, headers=github_headers, timeout=10).json()
pushed_at_tmp = tools_json['pushed_at'] pushed_at_tmp = tools_json['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] #获取的是API上的时间
tools_name = tools_json['name'] tools_name = tools_json['name']
api_url = tools_json['url'] api_url = tools_json['url']
try: try:
releases_json = requests.get(url[0]+"/releases", headers=github_headers, timeout=10).json() releases_json = requests.get(url+"/releases", headers=github_headers, timeout=10).json()
tag_name = releases_json[0]['tag_name'] tag_name = releases_json[0]['tag_name']
except Exception as e: except Exception as e:
tag_name = "no releases" tag_name = "no releases"
@ -188,23 +212,37 @@ def get_pushed_at_time(tools_list):
return tools_info_list return tools_info_list
#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回 #根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
def tools_query_sqlite3(tools_name): def tools_query_sqlite3(tools_name):
result_list = []
conn = sqlite3.connect('data.db') conn = sqlite3.connect('data.db')
cur = conn.cursor() cur = conn.cursor()
sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name) sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name)
cursor = cur.execute(sql_grammar) cursor = cur.execute(sql_grammar)
for result in cursor: for result in cursor:
return (result[0],result[1]) result_list.append({"pushed_at":result[0],"tag_name":result[1]})
conn.close() conn.close()
#获取更新了的红队工具信息 print("[###########] tools_query_sqlite3 函数内 result_list 的值 为 - > {}".format(result_list))
logging.info("[###########] tools_query_sqlite3 函数内 result_list 的值 为 - > {}".format(result_list))
return result_list
#获取更新了的红队工具在数据库里面的时间和版本
def get_tools_update_list(data): def get_tools_update_list(data):
tools_update_list = []
for dist in data: for dist in data:
print("dist 变量 ->{}".format(dist))
logging.info("dist 变量 ->{}".format(dist))
query_result = tools_query_sqlite3(dist['tools_name']) query_result = tools_query_sqlite3(dist['tools_name'])
today_tools_pushed_at = query_result[0] today_tools_pushed_at = query_result[0]['pushed_at']
logging.info("[###########] get_tools_update_list 函数内 today_tools_pushed_at的值 - >{}".format(today_tools_pushed_at))
# print("[!!] 今日获取时间: ", dist['pushed_at'], "获取数据库时间: ", today_tools_pushed_at, dist['tools_name'])
if dist['pushed_at'] != today_tools_pushed_at: if dist['pushed_at'] != today_tools_pushed_at:
print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!")
logging.info("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") logging.info("[!] tools_name: {} 今日API时间: {} 获取数据库时间: {} {}update!!!!".format(dist['tools_name'],dist['pushed_at'],today_tools_pushed_at,dist['tools_name']))
#返回数据库里面的时间和版本 #返回数据库里面的时间和版本
tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]}) tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[0]['tag_name']})
else:
print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name']," no update")
logging.info("[-] tools_name: {} 今日API时间: {} 获取数据库时间: {} {} no update".format(dist['tools_name'],dist['pushed_at'],today_tools_pushed_at,dist['tools_name']))
logging.info("get_tools_update_list 函数 tools_update_list数组的值为{}".format(tools_update_list))
# todo BUG在数组
return tools_update_list return tools_update_list
#获取更新信息并发送到对应社交软件 #获取更新信息并发送到对应社交软件
def send_dingding(url,query_pushed_at,query_tag_name): def send_dingding(url,query_pushed_at,query_tag_name):
@ -213,13 +251,11 @@ def send_dingding(url,query_pushed_at,query_tag_name):
json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json() json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json()
new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0] new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0]
if len(json_str) != 0: if len(json_str) != 0:
release_published_at_tmp = json_str[0]['published_at']
release_published_at = re.findall('\d{4}-\d{2}-\d{2}', release_published_at_tmp)[0]
tag_name = json_str[0]['tag_name'] tag_name = json_str[0]['tag_name']
if query_pushed_at != new_pushed_at :
print("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) print("[*] 数据库里的pushed_at -->", query_pushed_at, ";;;; api的pushed_at -->", new_pushed_at)
logging.info("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) logging.info("[*] {}工具在数据库里的pushed_at -->{} api的pushed_at -->{}".format(url.split('/')[-1],query_pushed_at,new_pushed_at))
if query_pushed_at != new_pushed_at and tag_name != query_tag_name: if tag_name != query_tag_name:
try: try:
update_log = json_str[0]['body'] update_log = json_str[0]['body']
except Exception as e: except Exception as e:
@ -243,8 +279,8 @@ def send_dingding(url,query_pushed_at,query_tag_name):
conn.commit() conn.commit()
conn.close() conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新现在tag_name为 -->",tag_name) print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新现在tag_name为 -->",tag_name)
logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新现在tag_name为 -->",tag_name) logging.info("[+] tools_name -->{} the tools have releases pushed_at 已更新现在pushed_at 为 -->{} tag_name 已更新现在tag_name为 -->{}".format(tools_name,new_pushed_at,tag_name))
else: elif tag_name == query_tag_name:
commits_url = url + "/commits" commits_url = url + "/commits"
commits_url_response_json = requests.get(commits_url).text commits_url_response_json = requests.get(commits_url).text
commits_json = json.loads(commits_url_response_json) commits_json = json.loads(commits_url_response_json)
@ -269,10 +305,13 @@ def send_dingding(url,query_pushed_at,query_tag_name):
conn.commit() conn.commit()
conn.close() conn.close()
print("[+] tools_name -->",tools_name,"pushed_at 已更新现在pushed_at 为 -->",new_pushed_at) print("[+] tools_name -->",tools_name,"pushed_at 已更新现在pushed_at 为 -->",new_pushed_at)
logging.info("[+] tools_name -->",tools_name,"pushed_at 已更新现在pushed_at 为 -->",new_pushed_at) logging.info("[+] tools_name -->{} the tools have releases pushed_at 已更新现在pushed_at 为 -->{}".format(tools_name,new_pushed_at))
# return update_log, download_url, tools_version # return update_log, download_url, tools_version
else: else:
if query_pushed_at != new_pushed_at:
print("[*] 数据库里的pushed_at -->", query_pushed_at, ";;;; api的pushed_at -->", new_pushed_at)
logging.info("[*] {}工具在数据库里的pushed_at -->{} api的pushed_at -->{}".format(url.split('/')[-1],query_pushed_at,new_pushed_at))
json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json() json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json()
update_log = json_str[0]['commit']['message'] update_log = json_str[0]['commit']['message']
download_url = json_str[0]['html_url'] download_url = json_str[0]['html_url']
@ -292,7 +331,7 @@ def send_dingding(url,query_pushed_at,query_tag_name):
conn.commit() conn.commit()
conn.close() conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at) print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at)
logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at) logging.info("[+] tools_name -->{} the tools no releases pushed_at 已更新现在pushed_at 为 -->{}".format(tools_name,new_pushed_at))
# return update_log, download_url # return update_log, download_url
# 创建md5对象 # 创建md5对象
def nmd5(str): def nmd5(str):
@ -376,7 +415,7 @@ def sendNews(data):
# 获取 cve 名字 根据cve 名字,获取描述,并翻译 # 获取 cve 名字 根据cve 名字,获取描述,并翻译
for i in range(len(data)): for i in range(len(data)):
try: try:
cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cve_name = re.findall('(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cve_zh = get_cve_des_zh(cve_name) cve_zh = get_cve_des_zh(cve_name)
body = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(data[i]['cve_url']) + "\r\n" + "CVE描述" + "\r\n" + cve_zh body = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(data[i]['cve_url']) + "\r\n" + "CVE描述" + "\r\n" + cve_zh
if load_config()[0] == "dingding": if load_config()[0] == "dingding":
@ -403,12 +442,11 @@ if __name__ == '__main__':
#初始化部分 #初始化部分
create_database() create_database()
while True:
tools_list = load_tools_list() tools_list = load_tools_list()
tools_data = get_pushed_at_time(tools_list) tools_data = get_pushed_at_time(tools_list)
tools_insert_into_sqlite3(tools_data) tools_insert_into_sqlite3(tools_data)
while True:
try:
#CVE部分 #CVE部分
cve_data = getNews() cve_data = getNews()
today_cve_data = get_today_cve_info(cve_data) today_cve_data = get_today_cve_info(cve_data)
@ -416,11 +454,13 @@ if __name__ == '__main__':
cve_insert_into_sqlite3(today_cve_data) cve_insert_into_sqlite3(today_cve_data)
#红队工具部分 #红队工具部分
time.sleep(3) time.sleep(3)
tools_list = load_tools_list() tools_list_new = load_tools_list()
data2 = get_pushed_at_time(tools_list) data2 = get_pushed_at_time(tools_list_new)
data3 = get_tools_update_list(data2) data3 = get_tools_update_list(data2)
for i in range(len(data3)): for i in range(len(data3)):
try:
logging.error("[+++] data3 数据 : api_url - > {} pushed_at - > {} tag_name - > {}".format(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']))
send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']) send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name'])
except Exception as e: except Exception as e:
print("main 函数 遇到错误-->{}".format(e)) print("main函数 try循环 遇到错误-->{}".format(e))
logging.error("main 函数 遇到错误-->{}".format(e)) logging.error("main函数 try循环 遇到错误-->{}".format(e))