PyBot/spider/common.py

198 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import requests
import xml.etree.ElementTree as ET
import json
from requests.exceptions import RequestException
from loguru import logger
# 测试用爬虫请求头
headers = {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-User": "?1",
"Sec-Fetch-Dest": "document",
"Accept-Language": "zh-CN,zh;q=0.9"
}
doonsec_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate, br",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
"Te": "trailers",
"Connection": "keep-alive"
}
def fetch_rss(url, headers, timeout=60):
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status() # 检查请求是否成功
return response.content
except requests.Timeout:
logger.warning(f"请求 {url} 超时,跳过保存操作。")
return None
except RequestException as e:
logger.error(f"请求 {url} 时发生错误: {e}")
return None # 返回None表示请求失败
def parse_rss(rss_content):
items = []
root = ET.fromstring(rss_content)
for item in root.findall('.//item'):
item_dict = {}
for child in item:
item_dict[child.tag] = child.text
items.append(item_dict)
return items
def save_to_json(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# seebug 爬虫
def seebug_main():
url = "https://paper.seebug.org/rss/"
rss_content = fetch_rss(url, headers)
if rss_content is None:
logger.warning("无法获取Seebug社区RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/seebug.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/seebug.json')
logger.info("数据已保存到 ./JSON/seebug.json")
# 4hou 爬虫
def M_4hou_main():
url = "https://www.4hou.com/feed"
rss_content = fetch_rss(url, headers)
if rss_content is None:
logger.warning("无法获取嘶吼RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/4hou.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/4hou.json')
logger.info("数据已保存到 ./JSON/4hou.json")
# 安全客 爬虫
def anquanke_main():
url = "https://api.anquanke.com/data/v1/rss"
rss_content = fetch_rss(url, headers)
if rss_content is None:
logger.warning("无法获取安全客RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/anquanke.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/anquanke.json')
logger.info("数据已保存到 ./JSON/anquanke.json")
# sec_wiki 爬虫
def sec_wiki_main():
url = "https://www.sec_wiki.com/news/rss"
rss_content = fetch_rss(url, headers)
if rss_content is None:
logger.warning("无法获取安全维基RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/sec_wiki.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/sec_wiki.json')
logger.info("数据已保存到 ./JSON/sec_wiki.json")
# 华为 爬虫
def huawei_main():
url = "https://www.huawei.com/cn/rss-feeds/psirt/rss"
rss_content = fetch_rss(url, headers)
if rss_content is None:
logger.warning("无法获取华为RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/huawei.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/huawei.json')
logger.info("数据已保存到 ./JSON/huawei.json")
# 洞见微信聚合爬虫
def doonsec_main():
url = "https://wechat.doonsec.com/bayes_rss.xml"
rss_content = fetch_rss(url, doonsec_headers)
if rss_content is None:
logger.warning("无法获取洞见微信聚合RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/doonsec.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/doonsec.json')
logger.info("数据已保存到 ./JSON/doonsec.json")
# 奇安信攻防社区 爬虫
def qianxin_main():
url = "https://forum.butian.net/Rss"
rss_content = fetch_rss(url, headers)
if rss_content is None:
logger.warning("无法获取奇安信攻防社区RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/qianxin.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/qianxin.json')
logger.info("数据已保存到 ./JSON/qianxin.json")
def run():
seebug_main()
M_4hou_main()
anquanke_main()
# sec_wiki_main()
huawei_main()
doonsec_main()
qianxin_main()