150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
import time
|
|
import os
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from requests.exceptions import RequestException
|
|
from loguru import logger
|
|
|
|
headers = {
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"Pragma": "no-cache",
|
|
"Sec-Fetch-Dest": "document",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Sec-Fetch-Site": "same-origin",
|
|
"Sec-Fetch-User": "?1",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
"sec-ch-ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
|
|
"sec-ch-ua-mobile": "?0",
|
|
"sec-ch-ua-platform": '"Windows"'
|
|
}
|
|
|
|
def fetch_html(url, timeout=10):
|
|
try:
|
|
response = requests.get(url, headers=headers, timeout=timeout)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except requests.Timeout:
|
|
logger.warning(f"请求 {url} 超时,跳过保存操作。")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"请求 {url} 时发生错误: {e}")
|
|
|
|
def normalize_pub_date(pub_date):
|
|
# 检查是否为日期格式 2024-12-06
|
|
if re.match(r'\d{4}-\d{2}-\d{2}', pub_date):
|
|
return pub_date
|
|
|
|
# 检查是否为 '8天前' 格式
|
|
days_ago_match = re.match(r'(\d+)天前', pub_date)
|
|
if days_ago_match:
|
|
days_ago = int(days_ago_match.group(1))
|
|
return (datetime.now() - timedelta(days=days_ago)).strftime('%Y-%m-%d')
|
|
|
|
# 检查是否为 '小时前' 格式
|
|
hours_ago_match = re.match(r'(\d+)小时前', pub_date)
|
|
if hours_ago_match:
|
|
return datetime.now().strftime('%Y-%m-%d')
|
|
|
|
# 如果都不匹配,返回原始值
|
|
return pub_date
|
|
|
|
def parse_html(html_content):
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# 提取所有符合条件的<table>标签
|
|
tables = soup.find_all('table', class_='result')
|
|
|
|
results = []
|
|
|
|
for table in tables:
|
|
# 提取标题和链接
|
|
h3_tag = table.find('h3', class_='t')
|
|
if h3_tag:
|
|
a_tag = h3_tag.find('a')
|
|
title = a_tag.get_text(strip=True) if a_tag else "No title found"
|
|
link = a_tag['href'] if a_tag else "No link found"
|
|
else:
|
|
title = "No title found"
|
|
link = "No link found"
|
|
|
|
# 提取摘要
|
|
td_element = table.find('td', class_='f')
|
|
|
|
# 从td中进一步查找div.realtime之后的所有文本
|
|
realtime_div = td_element.find('div', class_='realtime')
|
|
if realtime_div:
|
|
text_parts = []
|
|
for sibling in realtime_div.next_siblings:
|
|
if sibling.name == 'font':
|
|
break
|
|
if isinstance(sibling, str) and sibling.strip():
|
|
text_parts.append(sibling.strip())
|
|
elif sibling.name and sibling.get_text(strip=True):
|
|
text_parts.append(sibling.get_text(strip=True))
|
|
|
|
# 将所有文本片段合并成一个字符串,并整理格式
|
|
cleaned_text = ' '.join(text_parts)
|
|
|
|
# 提取发布者
|
|
publisher_tag = table.find('a', class_='m')
|
|
publisher = publisher_tag.get_text(strip=True) if publisher_tag else "百度快照"
|
|
|
|
# 提取时间戳
|
|
time_tag = table.find('div', class_='realtime')
|
|
pub_date = time_tag.get_text(strip=True) if time_tag else "No timestamp found"
|
|
pub_date = normalize_pub_date(pub_date)
|
|
|
|
results.append({
|
|
"title": title,
|
|
"link": link,
|
|
"description": cleaned_text,
|
|
"author": publisher,
|
|
"pubDate": pub_date
|
|
})
|
|
|
|
return results
|
|
|
|
def baidu_main(keywords):
|
|
all_results = {} # 用于存储所有关键词的结果
|
|
|
|
for keyword in keywords:
|
|
url = f"https://www.baidu.com/s?tn=baidurt&cl=3&rn=20&ie=utf-8&rsv_bp=1&wd={keyword}"
|
|
# print(url)
|
|
html_content = fetch_html(url)
|
|
# 将解析后的数据保存到 JSON 文件
|
|
with open('./test.html', 'w', encoding='utf-8') as f:
|
|
f.write(html_content)
|
|
# print(html_content)
|
|
|
|
if html_content is None:
|
|
logger.warning(f"无法获取百度搜索内容,跳过保存操作。关键词: {keyword}")
|
|
continue
|
|
|
|
results = parse_html(html_content)
|
|
# 移除非法代理对
|
|
logger.info(f"关键词【{keyword}】的百度搜索内容保存成功。")
|
|
all_results[keyword] = results # 将结果存储在字典中,以关键词为键
|
|
time.sleep(5)
|
|
|
|
# 将所有结果转换为JSON格式
|
|
json_results = json.dumps(all_results, ensure_ascii=False, indent=4)
|
|
# print(json_results)
|
|
|
|
# 确保目录存在
|
|
os.makedirs(os.path.dirname('./resources/JSON/baidu.json'), exist_ok=True)
|
|
|
|
# 将解析后的数据保存到 JSON 文件
|
|
with open('./resources/JSON/baidu.json', 'w', encoding='utf-8') as f:
|
|
f.write(json_results)
|
|
|
|
if __name__ == "__main__":
|
|
keywords = ["齐鲁银行"]
|
|
baidu_main(keywords) |