198 lines
6.2 KiB
Python
198 lines
6.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
import os
|
||
import requests
|
||
import xml.etree.ElementTree as ET
|
||
import json
|
||
from requests.exceptions import RequestException
|
||
from loguru import logger
|
||
|
||
# 测试用爬虫请求头
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Cache-Control": "no-cache",
|
||
"Upgrade-Insecure-Requests": "1",
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
|
||
"Sec-Fetch-Site": "same-origin",
|
||
"Sec-Fetch-Mode": "navigate",
|
||
"Sec-Fetch-User": "?1",
|
||
"Sec-Fetch-Dest": "document",
|
||
"Accept-Language": "zh-CN,zh;q=0.9"
|
||
}
|
||
|
||
doonsec_headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.0",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
|
||
"Accept-Encoding": "gzip, deflate, br",
|
||
"Upgrade-Insecure-Requests": "1",
|
||
"Sec-Fetch-Dest": "document",
|
||
"Sec-Fetch-Mode": "navigate",
|
||
"Sec-Fetch-Site": "none",
|
||
"Sec-Fetch-User": "?1",
|
||
"Priority": "u=0, i",
|
||
"Te": "trailers",
|
||
"Connection": "keep-alive"
|
||
}
|
||
|
||
def fetch_rss(url, headers, timeout=60):
|
||
try:
|
||
response = requests.get(url, headers=headers, timeout=timeout)
|
||
response.raise_for_status() # 检查请求是否成功
|
||
return response.content
|
||
except requests.Timeout:
|
||
logger.warning(f"请求 {url} 超时,跳过保存操作。")
|
||
return None
|
||
except RequestException as e:
|
||
logger.error(f"请求 {url} 时发生错误: {e}")
|
||
return None # 返回None表示请求失败
|
||
|
||
def parse_rss(rss_content):
|
||
items = []
|
||
root = ET.fromstring(rss_content)
|
||
for item in root.findall('.//item'):
|
||
item_dict = {}
|
||
for child in item:
|
||
item_dict[child.tag] = child.text
|
||
items.append(item_dict)
|
||
return items
|
||
|
||
def save_to_json(data, filename):
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
|
||
|
||
# seebug 爬虫
|
||
def seebug_main():
|
||
url = "https://paper.seebug.org/rss/"
|
||
rss_content = fetch_rss(url, headers)
|
||
if rss_content is None:
|
||
logger.warning("无法获取Seebug社区RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/seebug.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/seebug.json')
|
||
logger.info("数据已保存到 ./JSON/seebug.json!")
|
||
|
||
# 4hou 爬虫
|
||
def M_4hou_main():
|
||
url = "https://www.4hou.com/feed"
|
||
rss_content = fetch_rss(url, headers)
|
||
|
||
if rss_content is None:
|
||
logger.warning("无法获取嘶吼RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/4hou.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/4hou.json')
|
||
logger.info("数据已保存到 ./JSON/4hou.json!")
|
||
|
||
# 安全客 爬虫
|
||
def anquanke_main():
|
||
url = "https://api.anquanke.com/data/v1/rss"
|
||
rss_content = fetch_rss(url, headers)
|
||
|
||
if rss_content is None:
|
||
logger.warning("无法获取安全客RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/anquanke.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/anquanke.json')
|
||
logger.info("数据已保存到 ./JSON/anquanke.json!")
|
||
|
||
# sec_wiki 爬虫
|
||
def sec_wiki_main():
|
||
url = "https://www.sec_wiki.com/news/rss"
|
||
rss_content = fetch_rss(url, headers)
|
||
|
||
if rss_content is None:
|
||
logger.warning("无法获取安全维基RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/sec_wiki.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/sec_wiki.json')
|
||
logger.info("数据已保存到 ./JSON/sec_wiki.json!")
|
||
|
||
# 华为 爬虫
|
||
def huawei_main():
|
||
url = "https://www.huawei.com/cn/rss-feeds/psirt/rss"
|
||
rss_content = fetch_rss(url, headers)
|
||
|
||
if rss_content is None:
|
||
logger.warning("无法获取华为RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/huawei.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/huawei.json')
|
||
logger.info("数据已保存到 ./JSON/huawei.json!")
|
||
|
||
# 洞见微信聚合爬虫
|
||
def doonsec_main():
|
||
url = "https://wechat.doonsec.com/bayes_rss.xml"
|
||
rss_content = fetch_rss(url, doonsec_headers)
|
||
|
||
if rss_content is None:
|
||
logger.warning("无法获取洞见微信聚合RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/doonsec.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/doonsec.json')
|
||
logger.info("数据已保存到 ./JSON/doonsec.json!")
|
||
|
||
# 奇安信攻防社区 爬虫
|
||
def qianxin_main():
|
||
url = "https://forum.butian.net/Rss"
|
||
rss_content = fetch_rss(url, headers)
|
||
|
||
if rss_content is None:
|
||
logger.warning("无法获取奇安信攻防社区RSS内容,跳过保存操作。")
|
||
return
|
||
|
||
items = parse_rss(rss_content)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname('./JSON/qianxin.json'), exist_ok=True)
|
||
|
||
# 将解析后的数据保存到 JSON 文件
|
||
save_to_json(items, './JSON/qianxin.json')
|
||
logger.info("数据已保存到 ./JSON/qianxin.json!")
|
||
|
||
def run():
|
||
seebug_main()
|
||
M_4hou_main()
|
||
anquanke_main()
|
||
# sec_wiki_main()
|
||
huawei_main()
|
||
doonsec_main()
|
||
qianxin_main()
|