PyBot/media/common.py

195 lines
6.0 KiB
Python
Raw Normal View History

2024-12-06 16:32:34 +08:00
# -*- coding: utf-8 -*-
2024-12-03 00:03:14 +08:00
import os
import requests
import xml.etree.ElementTree as ET
import json
2024-12-06 16:53:58 +08:00
from requests.exceptions import RequestException
2024-12-08 00:18:31 +08:00
import logging
# 设置日志记录
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers.clear() # 清除已有的处理器
file_handler = logging.FileHandler('./log/spider.log', mode='a', encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.propagate = False # 禁用日志传递
2024-12-03 00:03:14 +08:00
# 测试用爬虫请求头
headers = {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-User": "?1",
"Sec-Fetch-Dest": "document",
"Accept-Language": "zh-CN,zh;q=0.9"
}
def fetch_rss(url, headers):
2024-12-06 16:53:58 +08:00
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # 检查请求是否成功
return response.content
except RequestException as e:
2024-12-08 00:18:31 +08:00
logger.error(f"请求 {url} 时发生错误: {e}")
2024-12-06 16:53:58 +08:00
return None # 返回None表示请求失败
2024-12-03 00:03:14 +08:00
def parse_rss(rss_content):
items = []
root = ET.fromstring(rss_content)
for item in root.findall('.//item'):
item_dict = {}
for child in item:
item_dict[child.tag] = child.text
items.append(item_dict)
return items
def save_to_json(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
2024-12-08 00:18:31 +08:00
2024-12-03 00:03:14 +08:00
# seebug 爬虫
def seebug_main():
url = "https://paper.seebug.org/rss/"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取Seebug社区RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/seebug.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/seebug.json')
logger.info("数据已保存到 ./JSON/seebug.json")
2024-12-03 00:03:14 +08:00
# 4hou 爬虫
def M_4hou_main():
url = "https://www.4hou.com/feed"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取嘶吼RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/4hou.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/4hou.json')
logger.info("数据已保存到 ./JSON/4hou.json")
2024-12-03 00:03:14 +08:00
# 安全客 爬虫
def anquanke_main():
url = "https://api.anquanke.com/data/v1/rss"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取安全客RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/anquanke.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/anquanke.json')
logger.info("数据已保存到 ./JSON/anquanke.json")
2024-12-03 00:03:14 +08:00
# sec_wiki 爬虫
def sec_wiki_main():
url = "https://www.sec_wiki.com/news/rss"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取安全维基RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/sec_wiki.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/sec_wiki.json')
logger.info("数据已保存到 ./JSON/sec_wiki.json")
2024-12-03 00:03:14 +08:00
# 华为 爬虫
def huawei_main():
url = "https://www.huawei.com/cn/rss-feeds/psirt/rss"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取华为RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/huawei.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/huawei.json')
logger.info("数据已保存到 ./JSON/huawei.json")
2024-12-03 00:03:14 +08:00
# 洞见微信聚合爬虫
def doonsec_main():
url = "https://wechat.doonsec.com/bayes_rss.xml"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取洞见微信聚合RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/doonsec.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/doonsec.json')
logger.info("数据已保存到 ./JSON/doonsec.json")
2024-12-03 00:03:14 +08:00
2024-12-06 16:53:58 +08:00
# 奇安信攻防社区 爬虫
2024-12-03 00:03:14 +08:00
def qianxin_main():
url = "https://forum.butian.net/Rss"
rss_content = fetch_rss(url, headers)
2024-12-08 00:18:31 +08:00
if rss_content is None:
logger.warning("无法获取奇安信攻防社区RSS内容跳过保存操作。")
return
items = parse_rss(rss_content)
# 确保目录存在
os.makedirs(os.path.dirname('./JSON/qianxin.json'), exist_ok=True)
# 将解析后的数据保存到 JSON 文件
save_to_json(items, './JSON/qianxin.json')
logger.info("数据已保存到 ./JSON/qianxin.json")
2024-12-03 00:03:14 +08:00
def run():
2024-12-03 17:33:37 +08:00
seebug_main()
M_4hou_main()
anquanke_main()
2024-12-03 00:03:14 +08:00
# sec_wiki_main()
huawei_main()
doonsec_main()
2024-12-05 00:03:51 +08:00
qianxin_main()