# -*- coding: utf-8 -*- import os import requests import xml.etree.ElementTree as ET import json from requests.exceptions import RequestException from loguru import logger # 测试用爬虫请求头 headers = { "Content-Type": "application/json", "Cache-Control": "no-cache", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-User": "?1", "Sec-Fetch-Dest": "document", "Accept-Language": "zh-CN,zh;q=0.9" } doonsec_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", "Accept-Encoding": "gzip, deflate, br", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Priority": "u=0, i", "Te": "trailers", "Connection": "keep-alive" } def fetch_rss(url, headers, timeout=60): try: response = requests.get(url, headers=headers, timeout=timeout) response.raise_for_status() # 检查请求是否成功 return response.content except requests.Timeout: logger.warning(f"请求 {url} 超时,跳过保存操作。") return None except RequestException as e: logger.error(f"请求 {url} 时发生错误: {e}") return None # 返回None表示请求失败 def parse_rss(rss_content): items = [] root = ET.fromstring(rss_content) for item in root.findall('.//item'): item_dict = {} for child in item: item_dict[child.tag] = child.text items.append(item_dict) return items def save_to_json(data, filename): with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) # seebug 爬虫 def seebug_main(): url = "https://paper.seebug.org/rss/" rss_content = fetch_rss(url, headers) if rss_content is None: logger.warning("无法获取Seebug社区RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/seebug.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/seebug.json') logger.info("数据已保存到 ./JSON/seebug.json!") # 4hou 爬虫 def M_4hou_main(): url = "https://www.4hou.com/feed" rss_content = fetch_rss(url, headers) if rss_content is None: logger.warning("无法获取嘶吼RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/4hou.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/4hou.json') logger.info("数据已保存到 ./JSON/4hou.json!") # 安全客 爬虫 def anquanke_main(): url = "https://api.anquanke.com/data/v1/rss" rss_content = fetch_rss(url, headers) if rss_content is None: logger.warning("无法获取安全客RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/anquanke.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/anquanke.json') logger.info("数据已保存到 ./JSON/anquanke.json!") # sec_wiki 爬虫 def sec_wiki_main(): url = "https://www.sec_wiki.com/news/rss" rss_content = fetch_rss(url, headers) if rss_content is None: logger.warning("无法获取安全维基RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/sec_wiki.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/sec_wiki.json') logger.info("数据已保存到 ./JSON/sec_wiki.json!") # 华为 爬虫 def huawei_main(): url = "https://www.huawei.com/cn/rss-feeds/psirt/rss" rss_content = fetch_rss(url, headers) if rss_content is None: logger.warning("无法获取华为RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/huawei.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/huawei.json') logger.info("数据已保存到 ./JSON/huawei.json!") # 洞见微信聚合爬虫 def doonsec_main(): url = "https://wechat.doonsec.com/bayes_rss.xml" rss_content = fetch_rss(url, doonsec_headers) if rss_content is None: logger.warning("无法获取洞见微信聚合RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/doonsec.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/doonsec.json') logger.info("数据已保存到 ./JSON/doonsec.json!") # 奇安信攻防社区 爬虫 def qianxin_main(): url = "https://forum.butian.net/Rss" rss_content = fetch_rss(url, headers) if rss_content is None: logger.warning("无法获取奇安信攻防社区RSS内容,跳过保存操作。") return items = parse_rss(rss_content) # 确保目录存在 os.makedirs(os.path.dirname('./JSON/qianxin.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 save_to_json(items, './JSON/qianxin.json') logger.info("数据已保存到 ./JSON/qianxin.json!") def run(): seebug_main() M_4hou_main() anquanke_main() # sec_wiki_main() huawei_main() doonsec_main() qianxin_main()