# -*- coding: utf-8 -*- import requests from bs4 import BeautifulSoup import json import time import os import re from datetime import datetime, timedelta from requests.exceptions import RequestException from loguru import logger headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", "Accept-Language": "en-US,en;q=0.9", "Cache-Control": "no-cache", "Connection": "keep-alive", "Pragma": "no-cache", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", "sec-ch-ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"' } def fetch_html(url, timeout=10): try: response = requests.get(url, headers=headers, timeout=timeout) response.raise_for_status() return response.text except requests.Timeout: logger.warning(f"请求 {url} 超时,跳过保存操作。") except requests.exceptions.RequestException as e: logger.warning(f"请求 {url} 时发生错误: {e}") def normalize_pub_date(pub_date): # 检查是否为日期格式 2024-12-06 if re.match(r'\d{4}-\d{2}-\d{2}', pub_date): return pub_date # 检查是否为 '8天前' 格式 days_ago_match = re.match(r'(\d+)天前', pub_date) if days_ago_match: days_ago = int(days_ago_match.group(1)) return (datetime.now() - timedelta(days=days_ago)).strftime('%Y-%m-%d') # 检查是否为 '小时前' 格式 hours_ago_match = re.match(r'(\d+)小时前', pub_date) if hours_ago_match: return datetime.now().strftime('%Y-%m-%d') # 如果都不匹配,返回原始值 return pub_date def parse_html(html_content): soup = BeautifulSoup(html_content, 'html.parser') # 提取所有符合条件的标签 tables = soup.find_all('table', class_='result') results = [] for table in tables: # 提取标题和链接 h3_tag = table.find('h3', class_='t') if h3_tag: a_tag = h3_tag.find('a') title = a_tag.get_text(strip=True) if a_tag else "No title found" link = a_tag['href'] if a_tag else "No link found" else: title = "No title found" link = "No link found" # 提取摘要 td_element = table.find('td', class_='f') # 从td中进一步查找div.realtime之后的所有文本 realtime_div = td_element.find('div', class_='realtime') if realtime_div: text_parts = [] for sibling in realtime_div.next_siblings: if sibling.name == 'font': break if isinstance(sibling, str) and sibling.strip(): text_parts.append(sibling.strip()) elif sibling.name and sibling.get_text(strip=True): text_parts.append(sibling.get_text(strip=True)) # 将所有文本片段合并成一个字符串,并整理格式 cleaned_text = ' '.join(text_parts) # 提取发布者 publisher_tag = table.find('a', class_='m') publisher = publisher_tag.get_text(strip=True) if publisher_tag else "百度快照" # 提取时间戳 time_tag = table.find('div', class_='realtime') pub_date = time_tag.get_text(strip=True) if time_tag else "No timestamp found" pub_date = normalize_pub_date(pub_date) results.append({ "title": title, "link": link, "description": cleaned_text, "author": publisher, "pubDate": pub_date }) return results def baidu_main(keywords): all_results = {} # 用于存储所有关键词的结果 for keyword in keywords: url = f"https://www.baidu.com/s?tn=baidurt&cl=3&rn=20&ie=utf-8&rsv_bp=1&wd={keyword}" # print(url) html_content = fetch_html(url) # 将解析后的数据保存到 JSON 文件 with open('./test.html', 'w', encoding='utf-8') as f: f.write(html_content) # print(html_content) if html_content is None: logger.warning(f"无法获取百度搜索内容,跳过保存操作。关键词: {keyword}") continue results = parse_html(html_content) # 移除非法代理对 logger.info(f"关键词【{keyword}】的百度搜索内容保存成功。") all_results[keyword] = results # 将结果存储在字典中,以关键词为键 time.sleep(5) # 将所有结果转换为JSON格式 json_results = json.dumps(all_results, ensure_ascii=False, indent=4) # print(json_results) # 确保目录存在 os.makedirs(os.path.dirname('./resources/JSON/baidu.json'), exist_ok=True) # 将解析后的数据保存到 JSON 文件 with open('./resources/JSON/baidu.json', 'w', encoding='utf-8') as f: f.write(json_results) if __name__ == "__main__": keywords = ["齐鲁银行"] baidu_main(keywords)