[feat] requests 请求 去除正则,使用json ; 增加监控github发布的工具更新情况

This commit is contained in:
yhy0 2021-07-29 16:58:48 +08:00
parent 1b81ed786e
commit 7ffada303c
2 changed files with 94 additions and 47 deletions

View File

@ -2,7 +2,7 @@
## 更新 ## 更新
合并@[Xc1Ym](https://github.com/yhy0/github-cve-monitor/pull/6) requests 请求 去除正则使用json ; 增加监控github发布的工具更新情况
通过https://cve.mitre.org/ 获取CVE具体描述, 并通过有道翻译为中文 通过https://cve.mitre.org/ 获取CVE具体描述, 并通过有道翻译为中文

View File

@ -8,14 +8,53 @@
# https://my.oschina.net/u/4581868/blog/4380482 # https://my.oschina.net/u/4581868/blog/4380482
# https://github.com/kiang70/Github-Monitor # https://github.com/kiang70/Github-Monitor
import requests, re, time import requests, time
import dingtalkchatbot.chatbot as cb import dingtalkchatbot.chatbot as cb
import datetime import datetime
import hashlib import hashlib
import json
from lxml import etree from lxml import etree
# 抓取本年cve
def getNews():
try:
# 抓取本年的
year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
json_str = requests.get(api).json()
cve_total_count = json_str['total_count']
cve_description = json_str['items'][0]['description']
cve_url = json_str['items'][0]['html_url']
return cve_total_count, cve_description, cve_url
except Exception as e:
print(e, "github链接不通")
return '', '', ''
# 通过 pushed_at 检查工具是否更新
def get_pushed_at_time(tools_list):
total_list = []
for url in tools_list:
pushed_at = requests.get(url).json()['pushed_at']
total_list.append(pushed_at)
return total_list
def get_update_log(url):
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
# 判断是否有 releases 记录
json_str = requests.get(url + '/releases').json()
if len(json_str) != 0:
update_log = json_str[0]['body']
download_url = json_str[0]['html_url']
return update_log, download_url
else:
json_str = requests.get(url + '/releases').json()
update_log = json_str[0]['commit']['message']
return update_log, ''
# 创建md5对象 # 创建md5对象
def nmd5(str): def nmd5(str):
m = hashlib.md5() m = hashlib.md5()
@ -68,10 +107,7 @@ def translate(word):
} }
res = requests.post(url=url, data=data, headers=header) res = requests.post(url=url, data=data, headers=header)
t = res.content.decode('utf8') result_dict = res.json()
# 把返回来的json字符串解析成字典
result_dict = json.loads(t)
result = "" result = ""
for json_str in result_dict['translateResult'][0]: for json_str in result_dict['translateResult'][0]:
@ -80,22 +116,6 @@ def translate(word):
return result return result
def getNews():
try:
# 抓取本年的
year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
req = requests.get(api).text
cve_total_count = re.findall('"total_count":*.{1,10}"incomplete_results"',req)[0][14:17]
cve_description = re.findall('"description":*.{1,200}"fork"',req)[0].replace("\",\"fork\"",'').replace("\"description\":\"",'')
cve_url = re.findall('"svn_url":*.{1,200}"homepage"',req)[0].replace("\",\"homepage\"",'').replace("\"svn_url\":\"",'')
# 不推送 fork
if cve_description != '"description":null,"fork"':
return cve_total_count, cve_description, cve_url
except Exception as e:
print (e, "github链接不通")
# 钉钉 # 钉钉
def dingding(text, msg): def dingding(text, msg):
# 将此处换为钉钉机器人的api # 将此处换为钉钉机器人的api
@ -120,18 +140,13 @@ def tgbot(text,msg):
# 通过检查name 和 description 中是否存在test字样排除test # 通过检查name 和 description 中是否存在test字样排除test
def regular(req): def regular(req):
cve_name = re.findall('"name":*.{1,200}"full_name"', req)[0].replace("\"name\":\"",'').replace("\",\"full_name\"",'') cve_name = req['items'][0]['name']
cve_description = re.findall('"description":*.{1,200}"fork"', req)[0].replace("\",\"fork\"", '').replace( cve_description = req['items'][0]['description']
"\"description\":\"", '')
if cve_name.lower().find('test') == -1 and cve_description.lower().find('test') == -1: if cve_name.lower().find('test') == -1 and cve_description.lower().find('test') == -1:
return True return True
return False return False
# 获取cve 名字
def get_cve_name(req):
cve_name = re.findall('"name":*.{1,200}"full_name"', req)[0].replace("\"name\":\"", '').replace("\",\"full_name\"",'')
return cve_name
# 根据cve 名字,获取描述,并翻译 # 根据cve 名字,获取描述,并翻译
def get_cve_des_zh(cve): def get_cve_des_zh(cve):
query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve query_cve_url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve
@ -140,29 +155,49 @@ def get_cve_des_zh(cve):
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip() des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
return translate(des) return translate(des)
def sendNews(): def sendNews(tools_list):
while True: while True:
try: try:
print("cve 监控中 ...") print("cve 和 github 发布工具 监控中 ...")
# 抓取本年的 # 抓取本年的cve
year = datetime.datetime.now().year year = datetime.datetime.now().year
api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year) api = "https://api.github.com/search/repositories?q=CVE-{}&sort=updated".format(year)
# 请求API # 请求API
req = requests.get(api).text req = requests.get(api).json()
# 正则获取 total_count = req['total_count']
total_count = re.findall('"total_count":*.{1,10}"incomplete_results"', req)[0][14:17]
# 通过 pushed_at 检查工具是否更新
time_list1 = get_pushed_at_time(tools_list)
# 监控时间间隔3分钟 # 监控时间间隔3分钟
time.sleep(180) time.sleep(180)
# 推送正文内容
# 推送标题 time_list2 = get_pushed_at_time(tools_list)
text = r'有新的CVE送达'
regular(req) for i in range(len(tools_list)):
# 两次时间不相等,则代表工具更新
if time_list1[i] != time_list2[i]:
update_log, download_url = get_update_log(tools_list[i])
tools_name = tools_list[i].split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!'
# body = ''
if download_url != '':
body = "工具名称:" + tools_name + "\r\n" + "工具下载地址:" + download_url + "\r\n" + "工具更新日志:" + update_log
else:
body = "工具名称:" + "\r\n" + "工具更新日志:" + update_log
# 三选一即可,没配置的 注释或者删掉
# server(text, body)
dingding(text, body)
# tgbot(text,body)
print(body)
# 检查name 和 description 中是否存在test字样 和 是否更新 # 检查name 和 description 中是否存在test字样 和 是否更新
if regular(req) and total_count != getNews()[0]: if regular(req) and total_count != getNews()[0]:
cve = get_cve_name(req) # 推送正文内容
cve_zh = get_cve_des_zh(cve) # 推送标题
text = r'有新的CVE送达'
# 获取 cve 名字 根据cve 名字,获取描述,并翻译
cve_zh = get_cve_des_zh(req['items'][0]['name'])
msg = "CVE编号"+ str(getNews()[1]) + "\r\n"+"Github地址"+ str(getNews()[2]) + "\r\n" + "CVE描述"+ cve_zh msg = "CVE编号"+ str(getNews()[1]) + "\r\n"+"Github地址"+ str(getNews()[2]) + "\r\n" + "CVE描述"+ cve_zh
# 三选一即可,没配置的 注释或者删掉 # 三选一即可,没配置的 注释或者删掉
@ -170,12 +205,24 @@ def sendNews():
dingding(text, msg) dingding(text, msg)
# tgbot(text,msg) # tgbot(text,msg)
print(msg) print(msg)
else:
pass
except Exception as e: except Exception as e:
raise e print("Program runing error:{}".format(e))
if __name__ == '__main__': if __name__ == '__main__':
sendNews() tools_list = [
"https://api.github.com/repos/BeichenDream/Godzilla",
"https://api.github.com/repos/rebeyond/Behinder",
"https://api.github.com/repos/AntSwordProject/antSword",
"https://api.github.com/repos/j1anFen/shiro_attack",
"https://api.github.com/repos/yhy0/ExpDemo-JavaFX",
"https://api.github.com/repos/yhy0/github-cve-monitor",
"https://api.github.com/repos/gentilkiwi/mimikatz",
"https://api.github.com/repos/ehang-io/nps",
"https://api.github.com/repos/chaitin/xray",
"https://api.github.com/repos/FunnyWolf/pystinger",
"https://api.github.com/repos/L-codes/Neo-reGeorg",
"https://api.github.com/repos/shadow1ng/fscan",
]
sendNews(tools_list)