#!/usr/bin/python3 # -*- coding:utf-8 -*- # @Author : yhy&ddm&w4ter # 每3分钟检测一次github是否有新的cve漏洞提交记录,若有则通过server酱和钉钉机器人推送(二者配置一个即可) # 建议使用screen命令运行在自己的linux vps后台上,就可以愉快的接收各种cve了 # https://my.oschina.net/u/4581868/blog/4380482 # https://github.com/kiang70/Github-Monitor import json from collections import OrderedDict import requests, time, re import dingtalkchatbot.chatbot as cb import datetime import hashlib import yaml from lxml import etree import sqlite3 import logging today_cve_info_tmp = [] tools_update_list = [] logging.basicConfig(level=logging.DEBUG, filename='run_info.log') #读取配置文件 def load_config(): with open('config.yaml', 'r') as f: config = yaml.load(f,Loader=yaml.FullLoader) github_token = config['all_config']['github_token'] if int(config['all_config']['dingding'][0]['enable']) == 1: dingding_webhook = config['all_config']['dingding'][1]['webhook'] dingding_secretKey = config['all_config']['dingding'][2]['secretKey'] app_name = config['all_config']['dingding'][3]['app_name'] return app_name,github_token,dingding_webhook,dingding_secretKey elif int(config['all_config']['server'][0]['enable']) == 1: server_sckey = config['all_config']['server'][1]['sckey'] app_name = config['all_config']['server'][2]['app_name'] return app_name,github_token,server_sckey elif int(config['all_config']['tgbot'][0]['enable']) ==1 : tgbot_token = config['all_config']['tgbot'][1]['token'] tgbot_group_id = config['all_config']['tgbot'][2]['group_id'] app_name = config['all_config']['tgbot'][3]['app_name'] return app_name,github_token,tgbot_token,tgbot_group_id elif int(config['all_config']['tgbot'][0]['enable']) == 0 and int(config['all_config']['server'][0]['enable']) == 0 and int(config['all_config']['dingding'][0]['enable']) == 0: print("[-] 配置文件有误,三个社交软件的enable不能为0") logging.error("[-] 配置文件有误,三个社交软件的enable不能为0") github_headers = { 'Authorization': "token {}".format(load_config()[1]) # 替换自己的github token https://github.com/settings/tokens/new } #初始化创建数据库 def create_database(): conn = sqlite3.connect('data.db') print("create_database 函数 连接数据库成功!") logging.info("create_database 函数 连接数据库成功!") cur = conn.cursor() try: cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor (cve_name varchar(255), pushed_at varchar(255), cve_url varchar(255));''') print("成功创建CVE监控表") logging.info('成功创建CVE监控表') cur.execute('''CREATE TABLE IF NOT EXISTS redteam_tools_monitor (tools_name varchar(255), pushed_at varchar(255), tag_name varchar(255));''') print("成功创建红队工具监控表") logging.info('成功创建红队工具监控表') except Exception as e: print("创建cve监控表失败!报错:{}".format(e)) logging.error("创建cve监控表失败!报错:{}".format(e)) conn.commit() # 数据库存储在硬盘上需要commit 存储在内存中的数据库不需要 conn.close() if load_config()[0] == "dingding": dingding("test", "连接成功", load_config()[2], load_config()[3]) elif load_config()[0] == "server": server("test", "连接成功", load_config()[2]) elif load_config()[0] == "tgbot": tgbot("test", "连接成功", load_config()[2], load_config()[3]) # 根据排序获取本年前20条CVE def getNews(): try: # 抓取本年的 year = datetime.datetime.now().year api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year) json_str = requests.get(api, headers=github_headers, timeout=10).json() # cve_total_count = json_str['total_count'] # cve_description = json_str['items'][0]['description'] today_date = datetime.date.today() for i in range(20): cve_url = json_str['items'][i]['html_url'] cve_name = json_str['items'][i]['name'].lower() pushed_at_tmp = json_str['items'][i]['pushed_at'] pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] # today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at}) if pushed_at == str(today_date): today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at}) else: print("该{}的更新时间为{},不属于今天的CVE".format(cve_name,pushed_at)) logging.info("该{}的更新时间为{},不属于今天的CVE".format(cve_name,pushed_at)) today_cve_info = OrderedDict() for item in today_cve_info_tmp: today_cve_info.setdefault(item['cve_name'], {**item, }) today_cve_info = list(today_cve_info.values()) return today_cve_info # return cve_total_count, cve_description, cve_url, cve_name #\d{4}-\d{2}-\d{2} except Exception as e: print(e, "github链接不通") logging.error(e, "github链接不通") return '', '', '' #获取到的CVE信息插入到数据库 def cve_insert_into_sqlite3(data): conn = sqlite3.connect('data.db') print("cve_insert_into_sqlite3 函数 打开数据库成功!") logging.info("cve_insert_into_sqlite3 函数 打开数据库成功!") cur = conn.cursor() for i in range(len(data)): try: cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url'])) print("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name)) logging.info("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name)) except Exception as e: pass conn.commit() conn.close() #查询数据库里是否存在该CVE的方法 def query_cve_info_database(cve_name): conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name) cursor = cur.execute(sql_grammar) return len(list(cursor)) #获取不存在数据库里的CVE信息 def get_today_cve_info(today_cve_info_data): today_all_cve_info = [] # today_cve_info_data = getNews() for i in range(len(today_cve_info_data)): today_cve_name = today_cve_info_data[i]['cve_name'] Verify = query_cve_info_database(today_cve_name.upper()) if Verify == 0: print("[+] 数据库里不存在{}".format(today_cve_name.upper())) logging.info("[+] 数据库里不存在{}".format(today_cve_name.upper())) today_all_cve_info.append(today_cve_info_data[i]) else: print("[-] 数据库里存在{}".format(today_cve_name.upper())) logging.info("[-] 数据库里存在{}".format(today_cve_name.upper())) return today_all_cve_info #获取红队工具信息插入到数据库 def tools_insert_into_sqlite3(data): conn = sqlite3.connect('data.db') print("tools_insert_into_sqlite3 函数 打开数据库成功!") logging.info("tools_insert_into_sqlite3 函数 打开数据库成功!") cur = conn.cursor() for i in range(len(data)): cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'],data[i]['tag_name'])) print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) logging.info("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name']))) conn.commit() conn.close() #读取本地红队工具链接文件转换成list def load_tools_list(): with open('tools_list.yaml', 'r') as f: list = yaml.load(f,Loader=yaml.FullLoader) return list['tools_list'] #获取红队工具的名称,更新时间,版本名称信息 def get_pushed_at_time(tools_list): tools_info_list = [] for url in tools_list: tools_json = requests.get(url, headers=github_headers, timeout=10).json() pushed_at_tmp = tools_json['pushed_at'] pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] tools_name = tools_json['name'] api_url = tools_json['url'] try: releases_json = requests.get(url[0]+"/releases", headers=github_headers, timeout=10).json() tag_name = releases_json[0]['tag_name'] except Exception as e: tag_name = "no releases" tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":api_url,"tag_name":tag_name}) return tools_info_list #根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回 def tools_query_sqlite3(tools_name): conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name) cursor = cur.execute(sql_grammar) for result in cursor: return (result[0],result[1]) conn.close() #获取更新了的红队工具信息 def get_tools_update_list(data): for dist in data: query_result = tools_query_sqlite3(dist['tools_name']) today_tools_pushed_at = query_result[0] if dist['pushed_at'] != today_tools_pushed_at: print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") logging.info("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!") #返回数据库里面的时间和版本 tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]}) return tools_update_list #获取更新信息并发送到对应社交软件 def send_dingding(url,query_pushed_at,query_tag_name): # 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述 # 判断是否有 releases 记录 json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json() new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0] if len(json_str) != 0: release_published_at_tmp = json_str[0]['published_at'] release_published_at = re.findall('\d{4}-\d{2}-\d{2}', release_published_at_tmp)[0] tag_name = json_str[0]['tag_name'] print("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) logging.info("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at) if query_pushed_at != new_pushed_at and tag_name != query_tag_name: try: update_log = json_str[0]['body'] except Exception as e: update_log = "作者未写更新内容" download_url = json_str[0]['html_url'] tools_name = url.split('/')[-1] text = r'** ' + tools_name + r' ** 工具,版本更新啦!' body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log if load_config()[0] == "dingding": dingding(text, body,load_config()[2],load_config()[3]) elif load_config()[0] == "server": server(text, body,load_config()[2]) elif load_config()[0] == "tgbot": tgbot(text,body,load_config()[2],load_config()[3]) conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name) sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at, tools_name) cur.execute(sql_grammar) cur.execute(sql_grammar1) conn.commit() conn.close() print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name) logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新,现在tag_name为 -->",tag_name) else: commits_url = url + "/commits" commits_url_response_json = requests.get(commits_url).text commits_json = json.loads(commits_url_response_json) tools_name = url.split('/')[-1] download_url = commits_json[0]['html_url'] try: update_log = commits_json[0]['commit']['message'] except Exception as e: update_log = "作者未写更新内容,具体点击更新详情地址的URL进行查看" text = r'** ' + tools_name + r' ** 工具小更新了一波!' body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log if load_config()[0] == "dingding": dingding(text, body,load_config()[2],load_config()[3]) elif load_config()[0] == "server": server(text, body,load_config()[2]) elif load_config()[0] == "tgbot": tgbot(text,body,load_config()[2],load_config()[3]) conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) cur.execute(sql_grammar) conn.commit() conn.close() print("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at) logging.info("[+] tools_name -->",tools_name,"pushed_at 已更新,现在pushed_at 为 -->",new_pushed_at) # return update_log, download_url, tools_version else: json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json() update_log = json_str[0]['commit']['message'] download_url = json_str[0]['html_url'] tools_name = url.split('/')[-1] text = r'** ' + tools_name + r' ** 工具更新啦!' body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志:" + "\r\n" + update_log if load_config()[0] == "dingding": dingding(text, body, load_config()[2], load_config()[3]) elif load_config()[0] == "server": server(text, body, load_config()[2]) elif load_config()[0] == "tgbot": tgbot(text, body, load_config()[2], load_config()[3]) conn = sqlite3.connect('data.db') cur = conn.cursor() sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name) cur.execute(sql_grammar) conn.commit() conn.close() print("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at) logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新,现在pushed_at 为 -->", new_pushed_at) # return update_log, download_url # 创建md5对象 def nmd5(str): m = hashlib.md5() b = str.encode(encoding='utf-8') m.update(b) str_md5 = m.hexdigest() return str_md5 # 有道翻译 def translate(word): headerstr = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' bv = nmd5(headerstr) lts = str(round(time.time() * 1000)) salt = lts + '90' # 如果翻译失败,{'errorCode': 50} 请查看 fanyi.min.js: https://shared.ydstatic.com/fanyi/newweb/v1.1.7/scripts/newweb/fanyi.min.js # 搜索 fanyideskweb sign: n.md5("fanyideskweb" + e + i + "Y2FYu%TNSbMCxc3t2u^XT") ,Y2FYu%TNSbMCxc3t2u^XT是否改变,替换即可 strexample = 'fanyideskweb' + word + salt + 'Y2FYu%TNSbMCxc3t2u^XT' sign = nmd5(strexample) data = { 'i': word, 'from': 'AUTO', 'to': 'AUTO', 'smartresult': 'dict', 'client': 'fanyideskweb', 'salt': salt, 'sign': sign, 'lts': lts, 'bv': bv, 'doctype': 'json', 'version': '2.1', 'keyfrom': 'fanyi.web', 'action': 'FY_BY_CLICKBUTTION', } url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule' header = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36', 'Referer': 'http://fanyi.youdao.com/', 'Origin': 'http://fanyi.youdao.com', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Host': 'fanyi.youdao.com', 'cookie': '_ntes_nnid=937f1c788f1e087cf91d616319dc536a,1564395185984; OUTFOX_SEARCH_USER_ID_NCOO=; OUTFOX_SEARCH_USER_ID=-10218418@11.136.67.24; JSESSIONID=; ___rl__test__cookies=1' } res = requests.post(url=url, data=data, headers=header) result_dict = res.json() result = "" for json_str in result_dict['translateResult'][0]: tgt = json_str['tgt'] result += tgt return result # 钉钉 def dingding(text, msg,webhook,secretKey): ding = cb.DingtalkChatbot(webhook, secret=secretKey) ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False) # server酱 http://sc.ftqq.com/?c=code def server(text, msg,sckey): uri = 'https://sc.ftqq.com/{}.send?text={}&desp={}'.format(sckey,text, msg)# 将 xxxx 换成自己的server SCKEY requests.get(uri, headers=github_headers, timeout=10) # 添加Telegram Bot推送支持 def tgbot(text, msg,token,group_id): import telegram bot = telegram.Bot(token='xxx'.format(token))# Your Telegram Bot Token bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg)) # 根据cve 名字,获取描述,并翻译 def get_cve_des_zh(cve): time.sleep(3) query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve response = requests.get(query_cve_url, headers=github_headers, timeout=10) html = etree.HTML(response.text) des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip() return translate(des) #发送CVE信息到钉钉 def sendNews(data): try: text = r'有新的CVE送达!' # 获取 cve 名字 ,根据cve 名字,获取描述,并翻译 for i in range(len(data)): try: cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper() cve_zh = get_cve_des_zh(cve_name) body = "CVE编号:" + cve_name + "\r\n" + "Github地址:" + str(data[i]['cve_url']) + "\r\n" + "CVE描述:" + "\r\n" + cve_zh if load_config()[0] == "dingding": dingding(text, body, load_config()[2], load_config()[3]) print("钉钉 发送 CVE 成功") logging.info("钉钉 发送 CVE 成功") elif load_config()[0] == "server": server(text, body, load_config()[2]) print("server酱 发送 CVE 成功") logging.info("server酱 发送 CVE 成功") elif load_config()[0] == "tgbot": tgbot(text, body, load_config()[2], load_config()[3]) print("tgbot 发送 CVE 成功") logging.info("tgbot 发送 CVE 成功") except IndexError: pass except Exception as e: print("sendNews 函数 error:{}".format(e)) logging.error("sendNews 函数 error:{}".format(e)) #main函数 if __name__ == '__main__': print("cve 和 github 发布工具 监控中 ...") logging.info("cve 和 github 发布工具 监控中 ...") #初始化部分 create_database() tools_list = load_tools_list() tools_data = get_pushed_at_time(tools_list) tools_insert_into_sqlite3(tools_data) while True: try: #CVE部分 cve_data = getNews() today_cve_data = get_today_cve_info(cve_data) sendNews(today_cve_data) cve_insert_into_sqlite3(today_cve_data) #红队工具部分 time.sleep(3) tools_list = load_tools_list() data2 = get_pushed_at_time(tools_list) data3 = get_tools_update_list(data2) for i in range(len(data3)): send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name']) except Exception as e: print("main 函数 遇到错误-->{}".format(e)) logging.error("main 函数 遇到错误-->{}".format(e))