diff --git a/github_cve_monitor.py b/github_cve_monitor.py index 5d332e7..bed72a8 100644 --- a/github_cve_monitor.py +++ b/github_cve_monitor.py @@ -8,67 +8,228 @@ # https://my.oschina.net/u/4581868/blog/4380482 # https://github.com/kiang70/Github-Monitor import json - +from collections import OrderedDict import requests, time, re import dingtalkchatbot.chatbot as cb import datetime import hashlib from lxml import etree -import traceback +import sqlite3 +file = "toollist.txt" github_headers = { - 'Authorization': "token xxxxxx" # 替换自己的github token https://github.com/settings/tokens/new + 'Authorization': "token ****" # 替换自己的github token https://github.com/settings/tokens/new } - - -# 抓取本年cve +today_cve_info_tmp = [] +tools_update_list = [] +#初始化创建数据库 +def create_database(): + conn = sqlite3.connect('data.db') + print("create_database 函数 连接数据库成功!") + cur = conn.cursor() + try: + cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor + (cve_name varchar(255), + pushed_at varchar(255), + cve_url varchar(255));''') + print("成功创建CVE监控表") + cur.execute('''CREATE TABLE IF NOT EXISTS redteam_tools_monitor + (tools_name varchar(255), + pushed_at varchar(255), + tag_name varchar(255));''') + print("成功创建红队工具监控表") + except Exception as e: + print("创建cve监控表失败!报错:{}".format(e)) + conn.commit() # 数据库存储在硬盘上需要commit 存储在内存中的数据库不需要 + conn.close() +# 根据排序获取本年前20条CVE def getNews(): try: # 抓取本年的 year = datetime.datetime.now().year api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year) json_str = requests.get(api, headers=github_headers, timeout=10).json() - cve_total_count = json_str['total_count'] - cve_description = json_str['items'][0]['description'] - cve_url = json_str['items'][0]['html_url'] - cve_name = json_str['items'][0]['name'] - return cve_total_count, cve_description, cve_url,cve_name + # cve_total_count = json_str['total_count'] + # cve_description = json_str['items'][0]['description'] + today_date = datetime.date.today() + for i in range(20): + cve_url = json_str['items'][i]['html_url'] + cve_name = json_str['items'][i]['name'].lower() + pushed_at_tmp = json_str['items'][i]['pushed_at'] + pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] + # today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at}) + if pushed_at == str(today_date): + today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at}) + # print(today_cve_info) + today_cve_info = OrderedDict() + for item in today_cve_info_tmp: + today_cve_info.setdefault(item['cve_name'], {**item, }) + today_cve_info = list(today_cve_info.values()) + + return today_cve_info + # return cve_total_count, cve_description, cve_url, cve_name + #\d{4}-\d{2}-\d{2} except Exception as e: print(e, "github链接不通") return '', '', '' - - -# 通过 pushed_at 检查工具是否更新 +#获取到的CVE信息插入到数据库 +def cve_insert_into_sqlite3(data): + conn = sqlite3.connect('data.db') + print("cve_insert_into_sqlite3 函数 打开数据库成功!") + cur = conn.cursor() + for i in range(len(data)): + try: + cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() + cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url'])) + except Exception as e: + pass + conn.commit() + print("cve_insert_into_sqlite3 函数 插入数据成功!") + conn.close() +#查询数据库里是否存在该CVE的方法 +def query_cve_info_database(cve_name): + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name) + cursor = cur.execute(sql_grammar) + return len(list(cursor)) +#获取不存在数据库里的CVE信息 +def get_today_cve_info(today_cve_info_data): + today_all_cve_info = [] + # today_cve_info_data = getNews() + for i in range(len(today_cve_info_data)): + today_cve_name = today_cve_info_data[i]['cve_name'] + Verify = query_cve_info_database(today_cve_name.upper()) + if Verify == 0: + print("[+] 数据库里不存在{}".format(today_cve_name.upper())) + today_all_cve_info.append(today_cve_info_data[i]) + else: + print("[-] 数据库里存在{}".format(today_cve_name.upper())) + return today_all_cve_info +#获取红队工具信息插入到数据库 +def tools_insert_into_sqlite3(data): + conn = sqlite3.connect('data.db') + print("tools_insert_into_sqlite3 函数 打开数据库成功!") + cur = conn.cursor() + for i in range(len(data)): + cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'],data[i]['tag_name'])) + conn.commit() + print("tools_insert_into_sqlite3 函数 插入数据成功!") + conn.close() +#读取本地红队工具链接文件转换成list +def timing_update_tools_list(file): + result = [] + with open(file,'r') as f: + for line in f: + result.append(list(line.strip('\n').split(','))) + return result +#获取红队工具的名称,更新时间,版本名称信息 def get_pushed_at_time(tools_list): - total_list = [] + tools_info_list = [] for url in tools_list: - pushed_at = requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'] - total_list.append(pushed_at) + tools_json = requests.get(url[0], headers=github_headers, timeout=10).json() + pushed_at_tmp = tools_json['pushed_at'] + pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] + tools_name = tools_json['name'] + api_url = tools_json['url'] + try: + releases_json = requests.get(url[0]+"/releases", headers=github_headers, timeout=10).json() + tag_name = releases_json[0]['tag_name'] + except Exception as e: + tag_name = "no releases" + tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":api_url,"tag_name":tag_name}) - return total_list - - -def get_update_log(url): + return tools_info_list +#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回 +def tools_query_sqlite3(tools_name): + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name) + cursor = cur.execute(sql_grammar) + for result in cursor: + return (result[0],result[1]) + conn.close() +#获取更新了的红队工具信息 +def get_tools_update_list(data): + for dist in data: + query_result = tools_query_sqlite3(dist['tools_name']) + today_tools_pushed_at = query_result[0] + if dist['pushed_at'] != today_tools_pushed_at: + print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") + #返回数据库里面的时间和版本 + tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]}) + return tools_update_list +#获取更新信息并发送到钉钉 +def send_dingding(url,query_pushed_at,query_tag_name): # 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述 - # 判断是否有 releases 记录 json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json() + new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0] if len(json_str) != 0: - try: - update_log = json_str[0]['body'] - except Exception as e: - update_log = "作者未写更新内容" - download_url = json_str[0]['html_url'] - tools_version = json_str[0]['name'] - return update_log, download_url,len(json_str),tools_version + release_published_at_tmp = json_str[0]['published_at'] + release_published_at = re.findall('\d{4}-\d{2}-\d{2}', release_published_at_tmp)[0] + tag_name = json_str[0]['tag_name'] + + print("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) + if query_pushed_at != new_pushed_at and tag_name != query_tag_name: + try: + update_log = json_str[0]['body'] + except Exception as e: + update_log = "作者未写更新内容" + download_url = json_str[0]['html_url'] + tools_name = url.split('/')[-1] + text = r'** ' + tools_name + r' ** 工具,版本更新啦!' + body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log + dingding(text, body) + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name) + sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at, tools_name) + cur.execute(sql_grammar) + cur.execute(sql_grammar1) + conn.commit() + conn.close() + print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name) + else: + commits_url = url + "/commits" + commits_url_response_json = requests.get(commits_url).text + commits_json = json.loads(commits_url_response_json) + tools_name = url.split('/')[-1] + download_url = commits_json[0]['html_url'] + try: + update_log = commits_json[0]['commit']['message'] + except Exception as e: + update_log = "作者未写更新内容,具体点击更新详情地址的URL进行查看" + text = r'** ' + tools_name + r' ** 工具小更新了一波!' + body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log + dingding(text, body) + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) + cur.execute(sql_grammar) + conn.commit() + conn.close() + print("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at) + + # return update_log, download_url, tools_version else: json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json() update_log = json_str[0]['commit']['message'] download_url = json_str[0]['html_url'] - return update_log, download_url - + tools_name = url.split('/')[-1] + text = r'** ' + tools_name + r' ** 工具更新啦!' + body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log + dingding(text, body) + conn = sqlite3.connect('data.db') + cur = conn.cursor() + sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) + cur.execute(sql_grammar) + conn.commit() + conn.close() + print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at) + # return update_log, download_url # 创建md5对象 def nmd5(str): m = hashlib.md5() @@ -76,8 +237,6 @@ def nmd5(str): m.update(b) str_md5 = m.hexdigest() return str_md5 - - # 有道翻译 def translate(word): headerstr = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' @@ -129,159 +288,70 @@ def translate(word): tgt = json_str['tgt'] result += tgt return result - - # 钉钉 def dingding(text, msg): - # 将此处换为钉钉机器人的api - webhook = 'xxxxxxxx' - secretKey = 'xxxxxxxx' # 替换自己的加签, 钉钉中机器人管理 - 加签 双击,右键复制 + webhook = '*****' # 将此处换为钉钉机器人的api + secretKey = '****' # 替换自己的加签, 钉钉中机器人管理 - 加签 双击,右键复制 ding = cb.DingtalkChatbot(webhook, secret=secretKey) ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False) - - # server酱 http://sc.ftqq.com/?c=code def server(text, msg): - # 将 xxxx 换成自己的server SCKEY - uri = 'https://sc.ftqq.com/xxxx.send?text={}&desp={}'.format(text, msg) + uri = 'https://sc.ftqq.com/xxxx.send?text={}&desp={}'.format(text, msg)# 将 xxxx 换成自己的server SCKEY requests.get(uri, headers=github_headers, timeout=10) - - # 添加Telegram Bot推送支持 def tgbot(text, msg): import telegram - # Your Telegram Bot Token - bot = telegram.Bot(token='123456:aaa-sdasdsa') + bot = telegram.Bot(token='123456:aaa-sdasdsa')# Your Telegram Bot Token group_id = 'Your Group ID' bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg)) - - -# 通过检查name 和 description 中是否存在test字样,排除test -def regular(req): - cve_name = req['items'][0]['name'] - cve_description = req['items'][0]['description'] - if cve_name.lower().find('test') == -1 and cve_description.lower().find('test') == -1: - return True - return False - - # 根据cve 名字,获取描述,并翻译 def get_cve_des_zh(cve): + time.sleep(3) query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve response = requests.get(query_cve_url, headers=github_headers, timeout=10) html = etree.HTML(response.text) des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip() return translate(des) - - -def sendNews(tools_list): - while True: - try: - print("cve 和 github 发布工具 监控中 ...") - # 抓取本年的cve - year = datetime.datetime.now().year - api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year) - # 请求API - req = requests.get(api, headers=github_headers, timeout=10).json() - total_count = req['total_count'] - - # 通过 pushed_at 检查工具是否更新 - time_list1 = get_pushed_at_time(tools_list) - - # 监控时间间隔3分钟 - time.sleep(180) - - # 检查name 和 description 中是否存在test字样 和 是否更新 - if regular(req) and total_count != getNews()[0]: - # 推送正文内容 - # 推送标题 - text = r'有新的CVE送达!' - # 获取 cve 名字 ,根据cve 名字,获取描述,并翻译 - cve_name = re.findall('(CVE\-\d+\-\d+)', getNews()[3])[0].upper() +#发送CVE信息到钉钉 +def sendNews(data): + try: + text = r'有新的CVE送达!' + # 获取 cve 名字 ,根据cve 名字,获取描述,并翻译 + for i in range(len(data)): + try: + cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cve_zh = get_cve_des_zh(cve_name) - msg = "CVE编号:" + cve_name + "\r\n" + "Github地址:" + str(getNews()[2]) + "\r\n" + "CVE描述:" +"\r\n"+ cve_zh - - # 三选一即可,没配置的 注释或者删掉 - # server(text, msg) + msg = "CVE编号:" + cve_name + "\r\n" + "Github地址:" + str(data[i]['cve_url']) + "\r\n" + "CVE描述:" + "\r\n" + cve_zh dingding(text, msg) + print("钉钉 发送 CVE 成功") + # server(text, msg) # tgbot(text,msg) - print(msg) - - time_list2 = get_pushed_at_time(tools_list) - - for i in range(len(tools_list)): - # 两次时间不相等,则代表工具更新 - if time_list1[i] != time_list2[i]: - # get_update_log_info = get_update_log(tools_list[i]) - if len(get_update_log(tools_list[i])) == 2: - update_log = get_update_log(tools_list[i])[0] - download_url = get_update_log(tools_list[i])[1] - tools_name = tools_list[i].split('/')[-1] - text = r'** ' + tools_name + r' ** 工具更新啦!' - body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log - # 三选一即可,没配置的 注释或者删掉 - # server(text, body) - dingding(text, body) - # tgbot(text,body) - print(body) - elif len(get_update_log(tools_list[i])) == 4: - one_all_info = get_update_log(tools_list[i]) - release_len_one = one_all_info[2] - time.sleep(120) - two_all_info = get_update_log(tools_list[i]) - release_len_two = two_all_info[2] - if release_len_one != release_len_two: - update_log = two_all_info[0] - download_url = two_all_info[1] - tools_version = two_all_info[3] - tools_name = tools_list[i].split('/')[-1] - text = r'** ' + tools_name + r' ** 工具版本更新啦!' - body = "工具名称:" + tools_name + "\r\n"+"当前最新版本:"+tools_version+"\r\n"+ "工具下载地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log - # 三选一即可,没配置的 注释或者删掉 - # server(text, body) - dingding(text, body) - # tgbot(text,body) - else: - commits_url = tools_list[i]+"/commits" - commits_url_response_json = requests.get(commits_url).text - commits_json = json.loads(commits_url_response_json) - tools_name = tools_list[i].split('/')[-1] - download_url = commits_json[0]['html_url'] - try: - update_log = commits_json[0]['commit']['message'] - except Exception as e: - update_log = "作者未写更新内容,具体点击更新详情地址的URL进行查看" - text = r'** ' + tools_name + r' ** 工具小更新了一波!' - body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log - # 三选一即可,没配置的 注释或者删掉 - # server(text, body) - dingding(text, body) - # tgbot(text,body) - - - - - - except Exception as e: - print("Program runing error:{}".format(traceback.print_exc())) - - + except IndexError: + pass + except Exception as e: + print("Program runing error:{}".format(e)) +#main函数 if __name__ == '__main__': - tools_list = [ - "https://api.github.com/repos/BeichenDream/Godzilla", - "https://api.github.com/repos/rebeyond/Behinder", - "https://api.github.com/repos/AntSwordProject/antSword", - "https://api.github.com/repos/j1anFen/shiro_attack", - "https://api.github.com/repos/yhy0/ExpDemo-JavaFX", - "https://api.github.com/repos/yhy0/github-cve-monitor", - "https://api.github.com/repos/gentilkiwi/mimikatz", - "https://api.github.com/repos/ehang-io/nps", - "https://api.github.com/repos/chaitin/xray", - "https://api.github.com/repos/FunnyWolf/pystinger", - "https://api.github.com/repos/L-codes/Neo-reGeorg", - "https://api.github.com/repos/shadow1ng/fscan", - "https://api.github.com/repos/SafeGroceryStore/MDUT", - "https://api.github.com/repos/EdgeSecurityTeam/Vulnerability", - ] - - sendNews(tools_list) + print("cve 和 github 发布工具 监控中 ...") + try: + #初始化部分 + create_database() + tools_list = timing_update_tools_list(file) + tools_data = get_pushed_at_time(tools_list) + tools_insert_into_sqlite3(tools_data) + + while True: + #CVE部分 + cve_data = getNews() + today_cve_data = get_today_cve_info(cve_data) + sendNews(today_cve_data) + cve_insert_into_sqlite3(today_cve_data) + #红队工具部分 + time.sleep(3) + tools_list = timing_update_tools_list(file) + data2 = get_pushed_at_time(tools_list) + data3 = get_tools_update_list(data2) + for i in range(len(data3)): + send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']) + except Exception as e: + print("main 函数 遇到错误-->{}".format(e))