diff --git a/github_cve_monitor.py b/github_cve_monitor.py index 1377282..d4e800c 100644 --- a/github_cve_monitor.py +++ b/github_cve_monitor.py @@ -2,7 +2,8 @@ # -*- coding:utf-8 -*- # @Author : yhy&ddm&w4ter -# 每3分钟检测一次github是否有新的cve漏洞提交记录,若有则通过server酱和钉钉机器人推送(二者配置一个即可) +# 每3分钟检测一次githu +# 是否有新的cve漏洞提交记录,若有则通过server酱和钉钉机器人推送(二者配置一个即可) # 建议使用screen命令运行在自己的linux vps后台上,就可以愉快的接收各种cve了 # https://my.oschina.net/u/4581868/blog/4380482 @@ -18,8 +19,7 @@ from lxml import etree import sqlite3 import logging -today_cve_info_tmp = [] -tools_update_list = [] + logging.basicConfig(level=logging.DEBUG, filename='run_info.log') #读取配置文件 @@ -53,8 +53,8 @@ github_headers = { #初始化创建数据库 def create_database(): conn = sqlite3.connect('data.db') - print("create_database 函数 连接数据库成功!") - logging.info("create_database 函数 连接数据库成功!") + # print("[]create_database 函数 连接数据库成功!") + # logging.info("create_database 函数 连接数据库成功!") cur = conn.cursor() try: cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor @@ -80,8 +80,9 @@ def create_database(): server("test", "连接成功", load_config()[2]) elif load_config()[0] == "tgbot": tgbot("test", "连接成功", load_config()[2], load_config()[3]) -# 根据排序获取本年前20条CVE +#根据排序获取本年前20条CVE def getNews(): + today_cve_info_tmp = [] try: # 抓取本年的 year = datetime.datetime.now().year @@ -92,15 +93,20 @@ def getNews(): today_date = datetime.date.today() for i in range(20): cve_url = json_str['items'][i]['html_url'] - cve_name = json_str['items'][i]['name'].lower() + try: + cve_name_tmp = json_str['items'][i]['name'].upper() + cve_name = re.findall('(CVE\-\d+\-\d+)', cve_name_tmp)[0].upper() + except Exception as e: + pass pushed_at_tmp = json_str['items'][i]['pushed_at'] pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] # today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at}) if pushed_at == str(today_date): today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at}) + else: - print("该{}的更新时间为{},不属于今天的CVE".format(cve_name,pushed_at)) - logging.info("该{}的更新时间为{},不属于今天的CVE".format(cve_name,pushed_at)) + print("[-] 该{}的更新时间为{},不属于今天的CVE".format(cve_name,pushed_at)) + logging.info("[-] 该{}的更新时间为{},不属于今天的CVE".format(cve_name,pushed_at)) today_cve_info = OrderedDict() for item in today_cve_info_tmp: today_cve_info.setdefault(item['cve_name'], {**item, }) @@ -122,7 +128,7 @@ def cve_insert_into_sqlite3(data): cur = conn.cursor() for i in range(len(data)): try: - cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() + cve_name = re.findall('(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url'])) print("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name)) logging.info("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name)) @@ -137,13 +143,24 @@ def query_cve_info_database(cve_name): sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name) cursor = cur.execute(sql_grammar) return len(list(cursor)) +#查询数据库里是否存在该tools工具名字的方法 +def query_tools_info_database(tools_name): + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "SELECT tools_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name) + cursor = cur.execute(sql_grammar) + return len(list(cursor)) #获取不存在数据库里的CVE信息 def get_today_cve_info(today_cve_info_data): today_all_cve_info = [] # today_cve_info_data = getNews() for i in range(len(today_cve_info_data)): - today_cve_name = today_cve_info_data[i]['cve_name'] - Verify = query_cve_info_database(today_cve_name.upper()) + try: + today_cve_name = re.findall('(CVE\-\d+\-\d+)', today_cve_info_data[i]['cve_name'])[0].upper() + logging.info("get_today_cve_info 函数 today_cve_name的值为 - > {}".format(today_cve_name)) + Verify = query_cve_info_database(today_cve_name.upper()) + except Exception as e: + Verify = 1 if Verify == 0: print("[+] 数据库里不存在{}".format(today_cve_name.upper())) logging.info("[+] 数据库里不存在{}".format(today_cve_name.upper())) @@ -159,9 +176,16 @@ def tools_insert_into_sqlite3(data): logging.info("tools_insert_into_sqlite3 函数 打开数据库成功!") cur = conn.cursor() for i in range(len(data)): - cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'],data[i]['tag_name'])) - print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) - logging.info("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) + Verify = query_tools_info_database(data[i]['tools_name']) + if Verify == 0: + print("[+] 红队工具表数据库里不存在{}".format(data[i]['tools_name'])) + logging.info("[+] 红队工具表数据库里不存在{}".format(data[i]['tools_name'])) + cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'], data[i]['tag_name'])) + print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) + logging.info("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) + else: + print("[-] 红队工具表数据库里存在{}".format(data[i]['tools_name'])) + logging.info("[-] 红队工具表数据库里存在{}".format(data[i]['tools_name'])) conn.commit() conn.close() #读取本地红队工具链接文件转换成list @@ -175,11 +199,11 @@ def get_pushed_at_time(tools_list): for url in tools_list: tools_json = requests.get(url, headers=github_headers, timeout=10).json() pushed_at_tmp = tools_json['pushed_at'] - pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] + pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] #获取的是API上的时间 tools_name = tools_json['name'] api_url = tools_json['url'] try: - releases_json = requests.get(url[0]+"/releases", headers=github_headers, timeout=10).json() + releases_json = requests.get(url+"/releases", headers=github_headers, timeout=10).json() tag_name = releases_json[0]['tag_name'] except Exception as e: tag_name = "no releases" @@ -188,23 +212,37 @@ def get_pushed_at_time(tools_list): return tools_info_list #根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回 def tools_query_sqlite3(tools_name): + result_list = [] conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name) cursor = cur.execute(sql_grammar) for result in cursor: - return (result[0],result[1]) + result_list.append({"pushed_at":result[0],"tag_name":result[1]}) conn.close() -#获取更新了的红队工具信息 + print("[###########] tools_query_sqlite3 函数内 result_list 的值 为 - > {}".format(result_list)) + logging.info("[###########] tools_query_sqlite3 函数内 result_list 的值 为 - > {}".format(result_list)) + return result_list +#获取更新了的红队工具在数据库里面的时间和版本 def get_tools_update_list(data): + tools_update_list = [] for dist in data: + print("dist 变量 ->{}".format(dist)) + logging.info("dist 变量 ->{}".format(dist)) query_result = tools_query_sqlite3(dist['tools_name']) - today_tools_pushed_at = query_result[0] + today_tools_pushed_at = query_result[0]['pushed_at'] + logging.info("[###########] get_tools_update_list 函数内 today_tools_pushed_at的值 - >{}".format(today_tools_pushed_at)) + # print("[!!] 今日获取时间: ", dist['pushed_at'], "获取数据库时间: ", today_tools_pushed_at, dist['tools_name']) if dist['pushed_at'] != today_tools_pushed_at: print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") - logging.info("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") + logging.info("[!] tools_name: {} 今日API时间: {} 获取数据库时间: {} {}update!!!!".format(dist['tools_name'],dist['pushed_at'],today_tools_pushed_at,dist['tools_name'])) #返回数据库里面的时间和版本 - tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]}) + tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[0]['tag_name']}) + else: + print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name']," no update") + logging.info("[-] tools_name: {} 今日API时间: {} 获取数据库时间: {} {} no update".format(dist['tools_name'],dist['pushed_at'],today_tools_pushed_at,dist['tools_name'])) + logging.info("get_tools_update_list 函数 tools_update_list数组的值为{}".format(tools_update_list)) + # todo BUG在数组 return tools_update_list #获取更新信息并发送到对应社交软件 def send_dingding(url,query_pushed_at,query_tag_name): @@ -213,87 +251,88 @@ def send_dingding(url,query_pushed_at,query_tag_name): json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json() new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0] if len(json_str) != 0: - release_published_at_tmp = json_str[0]['published_at'] - release_published_at = re.findall('\d{4}-\d{2}-\d{2}', release_published_at_tmp)[0] tag_name = json_str[0]['tag_name'] + if query_pushed_at != new_pushed_at : + print("[*] 数据库里的pushed_at -->", query_pushed_at, ";;;; api的pushed_at -->", new_pushed_at) + logging.info("[*] {}工具在数据库里的pushed_at -->{} api的pushed_at -->{}".format(url.split('/')[-1],query_pushed_at,new_pushed_at)) + if tag_name != query_tag_name: + try: + update_log = json_str[0]['body'] + except Exception as e: + update_log = "作者未写更新内容" + download_url = json_str[0]['html_url'] + tools_name = url.split('/')[-1] + text = r'** ' + tools_name + r' ** 工具,版本更新啦!' + body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log + if load_config()[0] == "dingding": + dingding(text, body,load_config()[2],load_config()[3]) + elif load_config()[0] == "server": + server(text, body,load_config()[2]) + elif load_config()[0] == "tgbot": + tgbot(text,body,load_config()[2],load_config()[3]) + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name) + sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at, tools_name) + cur.execute(sql_grammar) + cur.execute(sql_grammar1) + conn.commit() + conn.close() + print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name) + logging.info("[+] tools_name -->{} the tools have releases pushed_at 已更新,现在pushed_at 为 -->{} tag_name 已更新,现在tag_name为 -->{}".format(tools_name,new_pushed_at,tag_name)) + elif tag_name == query_tag_name: + commits_url = url + "/commits" + commits_url_response_json = requests.get(commits_url).text + commits_json = json.loads(commits_url_response_json) + tools_name = url.split('/')[-1] + download_url = commits_json[0]['html_url'] + try: + update_log = commits_json[0]['commit']['message'] + except Exception as e: + update_log = "作者未写更新内容,具体点击更新详情地址的URL进行查看" + text = r'** ' + tools_name + r' ** 工具小更新了一波!' + body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log + if load_config()[0] == "dingding": + dingding(text, body,load_config()[2],load_config()[3]) + elif load_config()[0] == "server": + server(text, body,load_config()[2]) + elif load_config()[0] == "tgbot": + tgbot(text,body,load_config()[2],load_config()[3]) + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) + cur.execute(sql_grammar) + conn.commit() + conn.close() + print("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at) + logging.info("[+] tools_name -->{} the tools have releases pushed_at 已更新,现在pushed_at 为 -->{}".format(tools_name,new_pushed_at)) - print("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) - logging.info("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) - if query_pushed_at != new_pushed_at and tag_name != query_tag_name: - try: - update_log = json_str[0]['body'] - except Exception as e: - update_log = "作者未写更新内容" + # return update_log, download_url, tools_version + else: + if query_pushed_at != new_pushed_at: + print("[*] 数据库里的pushed_at -->", query_pushed_at, ";;;; api的pushed_at -->", new_pushed_at) + logging.info("[*] {}工具在数据库里的pushed_at -->{} api的pushed_at -->{}".format(url.split('/')[-1],query_pushed_at,new_pushed_at)) + json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json() + update_log = json_str[0]['commit']['message'] download_url = json_str[0]['html_url'] tools_name = url.split('/')[-1] - text = r'** ' + tools_name + r' ** 工具,版本更新啦!' - body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log + text = r'** ' + tools_name + r' ** 工具更新啦!' + body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log if load_config()[0] == "dingding": - dingding(text, body,load_config()[2],load_config()[3]) + dingding(text, body, load_config()[2], load_config()[3]) elif load_config()[0] == "server": - server(text, body,load_config()[2]) + server(text, body, load_config()[2]) elif load_config()[0] == "tgbot": - tgbot(text,body,load_config()[2],load_config()[3]) - conn = sqlite3.connect('data.db') - cur = conn.cursor() - sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name) - sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at, tools_name) - cur.execute(sql_grammar) - cur.execute(sql_grammar1) - conn.commit() - conn.close() - print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name) - logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name) - else: - commits_url = url + "/commits" - commits_url_response_json = requests.get(commits_url).text - commits_json = json.loads(commits_url_response_json) - tools_name = url.split('/')[-1] - download_url = commits_json[0]['html_url'] - try: - update_log = commits_json[0]['commit']['message'] - except Exception as e: - update_log = "作者未写更新内容,具体点击更新详情地址的URL进行查看" - text = r'** ' + tools_name + r' ** 工具小更新了一波!' - body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log - if load_config()[0] == "dingding": - dingding(text, body,load_config()[2],load_config()[3]) - elif load_config()[0] == "server": - server(text, body,load_config()[2]) - elif load_config()[0] == "tgbot": - tgbot(text,body,load_config()[2],load_config()[3]) + tgbot(text, body, load_config()[2], load_config()[3]) conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) cur.execute(sql_grammar) conn.commit() conn.close() - print("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at) - logging.info("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at) - - # return update_log, download_url, tools_version - else: - json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json() - update_log = json_str[0]['commit']['message'] - download_url = json_str[0]['html_url'] - tools_name = url.split('/')[-1] - text = r'** ' + tools_name + r' ** 工具更新啦!' - body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log - if load_config()[0] == "dingding": - dingding(text, body, load_config()[2], load_config()[3]) - elif load_config()[0] == "server": - server(text, body, load_config()[2]) - elif load_config()[0] == "tgbot": - tgbot(text, body, load_config()[2], load_config()[3]) - conn = sqlite3.connect('data.db') - cur = conn.cursor() - sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) - cur.execute(sql_grammar) - conn.commit() - conn.close() - print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at) - logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at) - # return update_log, download_url + print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at) + logging.info("[+] tools_name -->{} the tools no releases pushed_at 已更新,现在pushed_at 为 -->{}".format(tools_name,new_pushed_at)) + # return update_log, download_url # 创建md5对象 def nmd5(str): m = hashlib.md5() @@ -376,7 +415,7 @@ def sendNews(data): # 获取 cve 名字 ,根据cve 名字,获取描述,并翻译 for i in range(len(data)): try: - cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() + cve_name = re.findall('(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cve_zh = get_cve_des_zh(cve_name) body = "CVE编号:" + cve_name + "\r\n" + "Github地址:" + str(data[i]['cve_url']) + "\r\n" + "CVE描述:" + "\r\n" + cve_zh if load_config()[0] == "dingding": @@ -403,24 +442,25 @@ if __name__ == '__main__': #初始化部分 create_database() - tools_list = load_tools_list() - tools_data = get_pushed_at_time(tools_list) - tools_insert_into_sqlite3(tools_data) while True: - try: - #CVE部分 - cve_data = getNews() - today_cve_data = get_today_cve_info(cve_data) - sendNews(today_cve_data) - cve_insert_into_sqlite3(today_cve_data) - #红队工具部分 - time.sleep(3) - tools_list = load_tools_list() - data2 = get_pushed_at_time(tools_list) - data3 = get_tools_update_list(data2) - for i in range(len(data3)): + tools_list = load_tools_list() + tools_data = get_pushed_at_time(tools_list) + tools_insert_into_sqlite3(tools_data) + #CVE部分 + cve_data = getNews() + today_cve_data = get_today_cve_info(cve_data) + sendNews(today_cve_data) + cve_insert_into_sqlite3(today_cve_data) + #红队工具部分 + time.sleep(3) + tools_list_new = load_tools_list() + data2 = get_pushed_at_time(tools_list_new) + data3 = get_tools_update_list(data2) + for i in range(len(data3)): + try: + logging.error("[+++] data3 数据 : api_url - > {} pushed_at - > {} tag_name - > {}".format(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name'])) send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']) - except Exception as e: - print("main 函数 遇到错误-->{}".format(e)) - logging.error("main 函数 遇到错误-->{}".format(e)) + except Exception as e: + print("main函数 try循环 遇到错误-->{}".format(e)) + logging.error("main函数 try循环 遇到错误-->{}".format(e))