github-cve-monitor/github_cve_monitor.py
wuyoukm d199c260a6
Update github_cve_monitor.py
配置文件标准化,修复BUG
2021-08-27 17:30:36 +08:00

427 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# -*- coding:utf-8 -*-
# @Author : yhy&ddm&w4ter
# 每3分钟检测一次github是否有新的cve漏洞提交记录若有则通过server酱和钉钉机器人推送二者配置一个即可
# 建议使用screen命令运行在自己的linux vps后台上就可以愉快的接收各种cve了
# https://my.oschina.net/u/4581868/blog/4380482
# https://github.com/kiang70/Github-Monitor
import json
from collections import OrderedDict
import requests, time, re
import dingtalkchatbot.chatbot as cb
import datetime
import hashlib
import yaml
from lxml import etree
import sqlite3
import logging
today_cve_info_tmp = []
tools_update_list = []
logging.basicConfig(level=logging.DEBUG, filename='run_info.log')
#读取配置文件
def load_config():
with open('config.yaml', 'r') as f:
config = yaml.load(f,Loader=yaml.FullLoader)
github_token = config['all_config']['github_token']
if int(config['all_config']['dingding'][0]['enable']) == 1:
dingding_webhook = config['all_config']['dingding'][1]['webhook']
dingding_secretKey = config['all_config']['dingding'][2]['secretKey']
app_name = config['all_config']['dingding'][3]['app_name']
return app_name,github_token,dingding_webhook,dingding_secretKey
elif int(config['all_config']['server'][0]['enable']) == 1:
server_sckey = config['all_config']['server'][1]['sckey']
app_name = config['all_config']['server'][2]['app_name']
return app_name,github_token,server_sckey
elif int(config['all_config']['tgbot'][0]['enable']) ==1 :
tgbot_token = config['all_config']['tgbot'][1]['token']
tgbot_group_id = config['all_config']['tgbot'][2]['group_id']
app_name = config['all_config']['tgbot'][3]['app_name']
return app_name,github_token,tgbot_token,tgbot_group_id
elif int(config['all_config']['tgbot'][0]['enable']) == 0 and int(config['all_config']['server'][0]['enable']) == 0 and int(config['all_config']['dingding'][0]['enable']) == 0:
print("[-] 配置文件有误三个社交软件的enable不能为0")
logging.error("[-] 配置文件有误三个社交软件的enable不能为0")
github_headers = {
'Authorization': "token {}".format(load_config()[1]) # 替换自己的github token https://github.com/settings/tokens/new
}
#初始化创建数据库
def create_database():
conn = sqlite3.connect('data.db')
print("create_database 函数 连接数据库成功!")
logging.info("create_database 函数 连接数据库成功!")
cur = conn.cursor()
try:
cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor
(cve_name varchar(255),
pushed_at varchar(255),
cve_url varchar(255));''')
print("成功创建CVE监控表")
logging.info('成功创建CVE监控表')
cur.execute('''CREATE TABLE IF NOT EXISTS redteam_tools_monitor
(tools_name varchar(255),
pushed_at varchar(255),
tag_name varchar(255));''')
print("成功创建红队工具监控表")
logging.info('成功创建红队工具监控表')
except Exception as e:
print("创建cve监控表失败报错{}".format(e))
logging.error("创建cve监控表失败报错{}".format(e))
conn.commit() # 数据库存储在硬盘上需要commit 存储在内存中的数据库不需要
conn.close()
if load_config()[0] == "dingding":
dingding("test", "连接成功", load_config()[2], load_config()[3])
elif load_config()[0] == "server":
server("test", "连接成功", load_config()[2])
elif load_config()[0] == "tgbot":
tgbot("test", "连接成功", load_config()[2], load_config()[3])
# 根据排序获取本年前20条CVE
def getNews():
try:
# 抓取本年的
year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
json_str = requests.get(api, headers=github_headers, timeout=10).json()
# cve_total_count = json_str['total_count']
# cve_description = json_str['items'][0]['description']
today_date = datetime.date.today()
for i in range(20):
cve_url = json_str['items'][i]['html_url']
cve_name = json_str['items'][i]['name'].lower()
pushed_at_tmp = json_str['items'][i]['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
# today_cve_info_tmp.append({"cve_name": cve_name, "cve_url": cve_url, "pushed_at": pushed_at})
if pushed_at == str(today_date):
today_cve_info_tmp.append({"cve_name":cve_name,"cve_url":cve_url,"pushed_at":pushed_at})
else:
print("{}的更新时间为{}不属于今天的CVE".format(cve_name,pushed_at))
logging.info("{}的更新时间为{}不属于今天的CVE".format(cve_name,pushed_at))
today_cve_info = OrderedDict()
for item in today_cve_info_tmp:
today_cve_info.setdefault(item['cve_name'], {**item, })
today_cve_info = list(today_cve_info.values())
return today_cve_info
# return cve_total_count, cve_description, cve_url, cve_name
#\d{4}-\d{2}-\d{2}
except Exception as e:
print(e, "github链接不通")
logging.error(e, "github链接不通")
return '', '', ''
#获取到的CVE信息插入到数据库
def cve_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("cve_insert_into_sqlite3 函数 打开数据库成功!")
logging.info("cve_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
try:
cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cur.execute("INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ('{}', '{}', '{}')".format(cve_name, data[i]['pushed_at'], data[i]['cve_url']))
print("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name))
logging.info("cve_insert_into_sqlite3 函数: {}插入数据成功!".format(cve_name))
except Exception as e:
pass
conn.commit()
conn.close()
#查询数据库里是否存在该CVE的方法
def query_cve_info_database(cve_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT cve_name FROM cve_monitor WHERE cve_name = '{}';".format(cve_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#获取不存在数据库里的CVE信息
def get_today_cve_info(today_cve_info_data):
today_all_cve_info = []
# today_cve_info_data = getNews()
for i in range(len(today_cve_info_data)):
today_cve_name = today_cve_info_data[i]['cve_name']
Verify = query_cve_info_database(today_cve_name.upper())
if Verify == 0:
print("[+] 数据库里不存在{}".format(today_cve_name.upper()))
logging.info("[+] 数据库里不存在{}".format(today_cve_name.upper()))
today_all_cve_info.append(today_cve_info_data[i])
else:
print("[-] 数据库里存在{}".format(today_cve_name.upper()))
logging.info("[-] 数据库里存在{}".format(today_cve_name.upper()))
return today_all_cve_info
#获取红队工具信息插入到数据库
def tools_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("tools_insert_into_sqlite3 函数 打开数据库成功!")
logging.info("tools_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
cur.execute("INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ('{}', '{}','{}')".format(data[i]['tools_name'], data[i]['pushed_at'],data[i]['tag_name']))
print("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name'])))
logging.info("tools_insert_into_sqlite3 函数: {}插入数据成功!".format(format(data[i]['tools_name'])))
conn.commit()
conn.close()
#读取本地红队工具链接文件转换成list
def load_tools_list():
with open('tools_list.yaml', 'r') as f:
list = yaml.load(f,Loader=yaml.FullLoader)
return list['tools_list']
#获取红队工具的名称,更新时间,版本名称信息
def get_pushed_at_time(tools_list):
tools_info_list = []
for url in tools_list:
tools_json = requests.get(url, headers=github_headers, timeout=10).json()
pushed_at_tmp = tools_json['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
tools_name = tools_json['name']
api_url = tools_json['url']
try:
releases_json = requests.get(url[0]+"/releases", headers=github_headers, timeout=10).json()
tag_name = releases_json[0]['tag_name']
except Exception as e:
tag_name = "no releases"
tools_info_list.append({"tools_name":tools_name,"pushed_at":pushed_at,"api_url":api_url,"tag_name":tag_name})
return tools_info_list
#根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
def tools_query_sqlite3(tools_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = '{}';".format(tools_name)
cursor = cur.execute(sql_grammar)
for result in cursor:
return (result[0],result[1])
conn.close()
#获取更新了的红队工具信息
def get_tools_update_list(data):
for dist in data:
query_result = tools_query_sqlite3(dist['tools_name'])
today_tools_pushed_at = query_result[0]
if dist['pushed_at'] != today_tools_pushed_at:
print("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!")
logging.info("今日获取时间: ",dist['pushed_at'],"获取数据库时间: ",today_tools_pushed_at,dist['tools_name'],"update!!!!")
#返回数据库里面的时间和版本
tools_update_list.append({"api_url":dist['api_url'],"pushed_at":today_tools_pushed_at,"tag_name":query_result[1]})
return tools_update_list
#获取更新信息并发送到对应社交软件
def send_dingding(url,query_pushed_at,query_tag_name):
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
# 判断是否有 releases 记录
json_str = requests.get(url + '/releases', headers=github_headers, timeout=10).json()
new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, timeout=10).json()['pushed_at'])[0]
if len(json_str) != 0:
release_published_at_tmp = json_str[0]['published_at']
release_published_at = re.findall('\d{4}-\d{2}-\d{2}', release_published_at_tmp)[0]
tag_name = json_str[0]['tag_name']
print("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at)
logging.info("[*] 数据库里的pushed_at -->",query_pushed_at,";;;; api的pushed_at -->",new_pushed_at)
if query_pushed_at != new_pushed_at and tag_name != query_tag_name:
try:
update_log = json_str[0]['body']
except Exception as e:
update_log = "作者未写更新内容"
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具,版本更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "工具更新日志:" + "\r\n" + update_log
if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
elif load_config()[0] == "server":
server(text, body,load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET tag_name = '{}' WHERE tools_name='{}'".format(tag_name,tools_name)
sql_grammar1 = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at, tools_name)
cur.execute(sql_grammar)
cur.execute(sql_grammar1)
conn.commit()
conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新现在tag_name为 -->",tag_name)
logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at,"tag_name 已更新现在tag_name为 -->",tag_name)
else:
commits_url = url + "/commits"
commits_url_response_json = requests.get(commits_url).text
commits_json = json.loads(commits_url_response_json)
tools_name = url.split('/')[-1]
download_url = commits_json[0]['html_url']
try:
update_log = commits_json[0]['commit']['message']
except Exception as e:
update_log = "作者未写更新内容具体点击更新详情地址的URL进行查看"
text = r'** ' + tools_name + r' ** 工具小更新了一波!'
body = "工具名称:" + tools_name + "\r\n" + "更新详情地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log
if load_config()[0] == "dingding":
dingding(text, body,load_config()[2],load_config()[3])
elif load_config()[0] == "server":
server(text, body,load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text,body,load_config()[2],load_config()[3])
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print("[+] tools_name -->",tools_name,"pushed_at 已更新现在pushed_at 为 -->",new_pushed_at)
logging.info("[+] tools_name -->",tools_name,"pushed_at 已更新现在pushed_at 为 -->",new_pushed_at)
# return update_log, download_url, tools_version
else:
json_str = requests.get(url + '/commits', headers=github_headers, timeout=10).json()
update_log = json_str[0]['commit']['message']
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!'
body = "工具名称:" + tools_name + "\r\n" + "工具地址:" + download_url + "\r\n" + "commit更新日志" + "\r\n" + update_log
if load_config()[0] == "dingding":
dingding(text, body, load_config()[2], load_config()[3])
elif load_config()[0] == "server":
server(text, body, load_config()[2])
elif load_config()[0] == "tgbot":
tgbot(text, body, load_config()[2], load_config()[3])
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "UPDATE redteam_tools_monitor SET pushed_at = '{}' WHERE tools_name='{}'".format(new_pushed_at,tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at)
logging.info("[+] tools_name -->", tools_name, "pushed_at 已更新现在pushed_at 为 -->", new_pushed_at)
# return update_log, download_url
# 创建md5对象
def nmd5(str):
m = hashlib.md5()
b = str.encode(encoding='utf-8')
m.update(b)
str_md5 = m.hexdigest()
return str_md5
# 有道翻译
def translate(word):
headerstr = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
bv = nmd5(headerstr)
lts = str(round(time.time() * 1000))
salt = lts + '90'
# 如果翻译失败,{'errorCode': 50} 请查看 fanyi.min.js: https://shared.ydstatic.com/fanyi/newweb/v1.1.7/scripts/newweb/fanyi.min.js
# 搜索 fanyideskweb sign: n.md5("fanyideskweb" + e + i + "Y2FYu%TNSbMCxc3t2u^XT") Y2FYu%TNSbMCxc3t2u^XT是否改变替换即可
strexample = 'fanyideskweb' + word + salt + 'Y2FYu%TNSbMCxc3t2u^XT'
sign = nmd5(strexample)
data = {
'i': word,
'from': 'AUTO',
'to': 'AUTO',
'smartresult': 'dict',
'client': 'fanyideskweb',
'salt': salt,
'sign': sign,
'lts': lts,
'bv': bv,
'doctype': 'json',
'version': '2.1',
'keyfrom': 'fanyi.web',
'action': 'FY_BY_CLICKBUTTION',
}
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
'Referer': 'http://fanyi.youdao.com/',
'Origin': 'http://fanyi.youdao.com',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Host': 'fanyi.youdao.com',
'cookie': '_ntes_nnid=937f1c788f1e087cf91d616319dc536a,1564395185984; OUTFOX_SEARCH_USER_ID_NCOO=; OUTFOX_SEARCH_USER_ID=-10218418@11.136.67.24; JSESSIONID=; ___rl__test__cookies=1'
}
res = requests.post(url=url, data=data, headers=header)
result_dict = res.json()
result = ""
for json_str in result_dict['translateResult'][0]:
tgt = json_str['tgt']
result += tgt
return result
# 钉钉
def dingding(text, msg,webhook,secretKey):
ding = cb.DingtalkChatbot(webhook, secret=secretKey)
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
# server酱 http://sc.ftqq.com/?c=code
def server(text, msg,sckey):
uri = 'https://sc.ftqq.com/{}.send?text={}&desp={}'.format(sckey,text, msg)# 将 xxxx 换成自己的server SCKEY
requests.get(uri, headers=github_headers, timeout=10)
# 添加Telegram Bot推送支持
def tgbot(text, msg,token,group_id):
import telegram
bot = telegram.Bot(token='xxx'.format(token))# Your Telegram Bot Token
bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg))
# 根据cve 名字,获取描述,并翻译
def get_cve_des_zh(cve):
time.sleep(3)
query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve
response = requests.get(query_cve_url, headers=github_headers, timeout=10)
html = etree.HTML(response.text)
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
return translate(des)
#发送CVE信息到钉钉
def sendNews(data):
try:
text = r'有新的CVE送达'
# 获取 cve 名字 根据cve 名字,获取描述,并翻译
for i in range(len(data)):
try:
cve_name = re.findall('(cve\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cve_zh = get_cve_des_zh(cve_name)
body = "CVE编号" + cve_name + "\r\n" + "Github地址" + str(data[i]['cve_url']) + "\r\n" + "CVE描述" + "\r\n" + cve_zh
if load_config()[0] == "dingding":
dingding(text, body, load_config()[2], load_config()[3])
print("钉钉 发送 CVE 成功")
logging.info("钉钉 发送 CVE 成功")
elif load_config()[0] == "server":
server(text, body, load_config()[2])
print("server酱 发送 CVE 成功")
logging.info("server酱 发送 CVE 成功")
elif load_config()[0] == "tgbot":
tgbot(text, body, load_config()[2], load_config()[3])
print("tgbot 发送 CVE 成功")
logging.info("tgbot 发送 CVE 成功")
except IndexError:
pass
except Exception as e:
print("sendNews 函数 error:{}".format(e))
logging.error("sendNews 函数 error:{}".format(e))
#main函数
if __name__ == '__main__':
print("cve 和 github 发布工具 监控中 ...")
logging.info("cve 和 github 发布工具 监控中 ...")
#初始化部分
create_database()
tools_list = load_tools_list()
tools_data = get_pushed_at_time(tools_list)
tools_insert_into_sqlite3(tools_data)
while True:
try:
#CVE部分
cve_data = getNews()
today_cve_data = get_today_cve_info(cve_data)
sendNews(today_cve_data)
cve_insert_into_sqlite3(today_cve_data)
#红队工具部分
time.sleep(3)
tools_list = load_tools_list()
data2 = get_pushed_at_time(tools_list)
data3 = get_tools_update_list(data2)
for i in range(len(data3)):
send_dingding(data3[i]['api_url'],data3[i]['pushed_at'],data3[i]['tag_name'])
except Exception as e:
print("main 函数 遇到错误-->{}".format(e))
logging.error("main 函数 遇到错误-->{}".format(e))