#!/usr/bin/env python3 """ 闲鱼商品搜索模块 基于 Playwright 实现真实的闲鱼商品搜索功能 """ import asyncio import json import time from datetime import datetime from typing import Dict, List, Any, Optional from loguru import logger try: from playwright.async_api import async_playwright PLAYWRIGHT_AVAILABLE = True except ImportError: PLAYWRIGHT_AVAILABLE = False logger.warning("Playwright 未安装,将使用模拟数据") class XianyuSearcher: """闲鱼商品搜索器 - 基于 Playwright""" def __init__(self): self.browser = None self.context = None self.page = None self.api_responses = [] async def safe_get(self, data, *keys, default="暂无"): """安全获取嵌套字典值""" for key in keys: try: data = data[key] except (KeyError, TypeError, IndexError): return default return data async def init_browser(self): """初始化浏览器""" if not PLAYWRIGHT_AVAILABLE: raise Exception("Playwright 未安装,无法使用真实搜索功能") if not self.browser: playwright = await async_playwright().start() logger.info("正在启动浏览器...") self.browser = await playwright.chromium.launch( headless=True, # 无头模式 args=[ '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-accelerated-2d-canvas', '--no-first-run', '--no-zygote', '--disable-gpu' ] ) self.context = await self.browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) self.page = await self.context.new_page() async def close_browser(self): """关闭浏览器""" if self.browser: await self.browser.close() self.browser = None self.context = None self.page = None async def search_items(self, keyword: str, page: int = 1, page_size: int = 20) -> Dict[str, Any]: """ 搜索闲鱼商品 - 使用 Playwright 获取真实数据 Args: keyword: 搜索关键词 page: 页码,从1开始 page_size: 每页数量 Returns: 搜索结果字典,包含items列表和总数 """ try: if not PLAYWRIGHT_AVAILABLE: logger.error("Playwright 不可用,无法获取真实数据") return { 'items': [], 'total': 0, 'error': 'Playwright 不可用,无法获取真实数据' } logger.info(f"使用 Playwright 搜索闲鱼商品: 关键词='{keyword}', 页码={page}, 每页={page_size}") await self.init_browser() # 清空之前的API响应 self.api_responses = [] data_list = [] # 设置API响应监听器 async def on_response(response): """处理API响应,解析数据""" if "h5api.m.goofish.com/h5/mtop.taobao.idlemtopsearch.pc.search" in response.url: try: # 检查响应状态 if response.status != 200: logger.warning(f"API响应状态异常: {response.status}") return # 安全地获取响应内容 try: result_json = await response.json() except Exception as json_error: logger.warning(f"无法解析响应JSON: {str(json_error)}") return self.api_responses.append(result_json) logger.info(f"捕获到API响应,URL: {response.url}") items = result_json.get("data", {}).get("resultList", []) logger.info(f"从API获取到 {len(items)} 条原始数据") for item in items: try: parsed_item = await self._parse_real_item(item) if parsed_item: data_list.append(parsed_item) except Exception as parse_error: logger.warning(f"解析单个商品失败: {str(parse_error)}") continue except Exception as e: logger.warning(f"响应处理异常: {str(e)}") try: logger.info("正在访问闲鱼首页...") await self.page.goto("https://www.goofish.com", timeout=30000) await self.page.wait_for_load_state("networkidle", timeout=10000) logger.info(f"正在搜索关键词: {keyword}") await self.page.fill('input[class*="search-input"]', keyword) # 注册响应监听 self.page.on("response", on_response) await self.page.click('button[type="submit"]') await self.page.wait_for_load_state("networkidle", timeout=15000) # 等待第一页API响应 logger.info("等待第一页API响应...") await asyncio.sleep(5) # 尝试处理弹窗 try: await self.page.keyboard.press('Escape') await asyncio.sleep(1) except: pass # 等待更多数据 await asyncio.sleep(3) first_page_count = len(data_list) logger.info(f"第1页完成,获取到 {first_page_count} 条数据") # 如果需要获取指定页数据,实现翻页逻辑 if page > 1: # 清空之前的数据,只保留目标页的数据 data_list.clear() await self._navigate_to_page(page) # 根据"人想要"数量进行倒序排列 data_list.sort(key=lambda x: x.get('want_count', 0), reverse=True) total_count = len(data_list) logger.info(f"搜索完成,总共获取到 {total_count} 条真实数据,已按想要人数排序") return { 'items': data_list, 'total': total_count, 'is_real_data': True, 'source': 'playwright' } finally: await self.close_browser() except Exception as e: logger.error(f"Playwright 搜索失败: {str(e)}") # 如果 Playwright 失败,返回错误信息 return { 'items': [], 'total': 0, 'error': f'搜索失败: {str(e)}' } async def _get_fallback_data(self, keyword: str, page: int, page_size: int) -> Dict[str, Any]: """获取备选数据(模拟数据)""" logger.info(f"使用备选数据: 关键词='{keyword}', 页码={page}, 每页={page_size}") # 模拟搜索延迟 await asyncio.sleep(0.5) # 生成模拟数据 mock_items = [] start_index = (page - 1) * page_size for i in range(page_size): item_index = start_index + i + 1 mock_items.append({ 'item_id': f'mock_{keyword}_{item_index}', 'title': f'{keyword}相关商品 #{item_index} [模拟数据]', 'price': f'{100 + item_index * 10}', 'seller_name': f'卖家{item_index}', 'item_url': f'https://www.goofish.com/item?id=mock_{keyword}_{item_index}', 'publish_time': '2025-07-28', 'tags': [f'标签{i+1}', f'分类{i+1}'], 'main_image': f'https://via.placeholder.com/200x200?text={keyword}商品{item_index}', 'raw_data': { 'mock': True, 'keyword': keyword, 'index': item_index } }) # 模拟总数 total_items = 100 + hash(keyword) % 500 logger.info(f"备选数据生成完成: 找到{len(mock_items)}个商品,总计{total_items}个") return { 'items': mock_items, 'total': total_items, 'is_fallback': True } async def _parse_real_item(self, item_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """解析真实的闲鱼商品数据""" try: main_data = await self.safe_get(item_data, "data", "item", "main", "exContent", default={}) click_params = await self.safe_get(item_data, "data", "item", "main", "clickParam", "args", default={}) # 解析商品信息 title = await self.safe_get(main_data, "title", default="未知标题") # 价格处理 price_parts = await self.safe_get(main_data, "price", default=[]) price = "价格异常" if isinstance(price_parts, list): price = "".join([str(p.get("text", "")) for p in price_parts if isinstance(p, dict)]) price = price.replace("当前价", "").strip() # 统一价格格式处理 if price and price != "价格异常": # 先移除所有¥符号,避免重复 clean_price = price.replace('¥', '').strip() # 处理万单位的价格 if "万" in clean_price: try: numeric_price = clean_price.replace('万', '').strip() price_value = float(numeric_price) * 10000 price = f"¥{price_value:.0f}" except: price = f"¥{clean_price}" # 如果转换失败,保持原样但确保有¥符号 else: # 普通价格,确保有¥符号 if clean_price and (clean_price[0].isdigit() or clean_price.replace('.', '').isdigit()): price = f"¥{clean_price}" else: price = clean_price if clean_price else "价格异常" # 只提取"想要人数"标签 fish_tags_content = "" fish_tags = await self.safe_get(main_data, "fishTags", default={}) # 遍历所有类型的标签 (r2, r3, r4等) for tag_type, tag_data in fish_tags.items(): if isinstance(tag_data, dict) and "tagList" in tag_data: tag_list = tag_data.get("tagList", []) for tag_item in tag_list: if isinstance(tag_item, dict) and "data" in tag_item: content = tag_item["data"].get("content", "") # 只保留包含"人想要"的标签 if content and "人想要" in content: fish_tags_content = content break if fish_tags_content: # 找到后就退出 break # 其他字段解析 area = await self.safe_get(main_data, "area", default="地区未知") seller = await self.safe_get(main_data, "userNickName", default="匿名卖家") raw_link = await self.safe_get(item_data, "data", "item", "main", "targetUrl", default="") image_url = await self.safe_get(main_data, "picUrl", default="") # 获取商品ID item_id = await self.safe_get(click_params, "item_id", default="未知ID") # 处理发布时间 publish_time = "未知时间" publish_timestamp = click_params.get("publishTime", "") if publish_timestamp and publish_timestamp.isdigit(): try: publish_time = datetime.fromtimestamp( int(publish_timestamp)/1000 ).strftime("%Y-%m-%d %H:%M") except: pass # 提取"人想要"的数字用于排序 want_count = self._extract_want_count(fish_tags_content) return { "item_id": item_id, "title": title, "price": price, "seller_name": seller, "item_url": raw_link.replace("fleamarket://", "https://www.goofish.com/"), "main_image": f"https:{image_url}" if image_url and not image_url.startswith("http") else image_url, "publish_time": publish_time, "tags": [fish_tags_content] if fish_tags_content else [], "area": area, "want_count": want_count, # 添加想要人数用于排序 "raw_data": item_data } except Exception as e: logger.warning(f"解析真实商品数据失败: {str(e)}") return None def _extract_want_count(self, tags_content: str) -> int: """从标签内容中提取"人想要"的数字""" try: if not tags_content or "人想要" not in tags_content: return 0 # 使用正则表达式提取数字 import re # 匹配类似 "123人想要" 或 "1.2万人想要" 的格式 pattern = r'(\d+(?:\.\d+)?(?:万)?)\s*人想要' match = re.search(pattern, tags_content) if match: number_str = match.group(1) if '万' in number_str: # 处理万单位 number = float(number_str.replace('万', '')) * 10000 return int(number) else: return int(float(number_str)) return 0 except Exception as e: logger.warning(f"提取想要人数失败: {str(e)}") return 0 async def _navigate_to_page(self, target_page: int): """导航到指定页面""" try: logger.info(f"正在导航到第 {target_page} 页...") # 等待页面稳定 await asyncio.sleep(2) # 查找并点击下一页按钮 next_button_selectors = [ '.search-page-tiny-arrow-right--oXVFaRao', # 用户找到的正确选择器 '[class*="search-page-tiny-arrow-right"]', # 更通用的版本 'button[aria-label="下一页"]', 'button:has-text("下一页")', 'a:has-text("下一页")', '.ant-pagination-next', 'li.ant-pagination-next a', 'a[aria-label="下一页"]', '[class*="next"]', '[class*="pagination-next"]', 'button[title="下一页"]', 'a[title="下一页"]' ] # 从第2页开始点击 for current_page in range(2, target_page + 1): logger.info(f"正在点击到第 {current_page} 页...") next_button_found = False for selector in next_button_selectors: try: next_button = self.page.locator(selector).first if await next_button.is_visible(timeout=3000): # 检查按钮是否可点击(不是禁用状态) is_disabled = await next_button.get_attribute("disabled") has_disabled_class = await next_button.evaluate("el => el.classList.contains('ant-pagination-disabled') || el.classList.contains('disabled')") if not is_disabled and not has_disabled_class: logger.info(f"找到下一页按钮,正在点击...") # 滚动到按钮位置 await next_button.scroll_into_view_if_needed() await asyncio.sleep(1) # 点击下一页 await next_button.click() await self.page.wait_for_load_state("networkidle", timeout=15000) # 等待新数据加载 await asyncio.sleep(5) logger.info(f"成功导航到第 {current_page} 页") next_button_found = True break except Exception as e: continue if not next_button_found: logger.warning(f"无法找到下一页按钮,停止在第 {current_page-1} 页") break except Exception as e: logger.error(f"导航到第 {target_page} 页失败: {str(e)}") async def search_multiple_pages(self, keyword: str, total_pages: int = 1) -> Dict[str, Any]: """ 搜索多页闲鱼商品 Args: keyword: 搜索关键词 total_pages: 总页数 Returns: 搜索结果字典,包含所有页面的items列表和总数 """ try: if not PLAYWRIGHT_AVAILABLE: logger.error("Playwright 不可用,无法获取真实数据") return { 'items': [], 'total': 0, 'error': 'Playwright 不可用,无法获取真实数据' } logger.info(f"使用 Playwright 搜索多页闲鱼商品: 关键词='{keyword}', 总页数={total_pages}") await self.init_browser() # 清空之前的API响应 self.api_responses = [] all_data_list = [] # 设置API响应监听器 async def on_response(response): """处理API响应,解析数据""" if "h5api.m.goofish.com/h5/mtop.taobao.idlemtopsearch.pc.search" in response.url: try: # 检查响应状态 if response.status != 200: logger.warning(f"API响应状态异常: {response.status}") return # 安全地获取响应内容 try: result_json = await response.json() except Exception as json_error: logger.warning(f"无法解析响应JSON: {str(json_error)}") return self.api_responses.append(result_json) logger.info(f"捕获到API响应,URL: {response.url}") items = result_json.get("data", {}).get("resultList", []) logger.info(f"从API获取到 {len(items)} 条原始数据") for item in items: try: parsed_item = await self._parse_real_item(item) if parsed_item: all_data_list.append(parsed_item) except Exception as parse_error: logger.warning(f"解析单个商品失败: {str(parse_error)}") continue except Exception as e: logger.warning(f"响应处理异常: {str(e)}") try: logger.info("正在访问闲鱼首页...") await self.page.goto("https://www.goofish.com", timeout=30000) await self.page.wait_for_load_state("networkidle", timeout=10000) logger.info(f"正在搜索关键词: {keyword}") await self.page.fill('input[class*="search-input"]', keyword) # 注册响应监听 self.page.on("response", on_response) await self.page.click('button[type="submit"]') await self.page.wait_for_load_state("networkidle", timeout=15000) # 等待第一页API响应 logger.info("等待第一页API响应...") await asyncio.sleep(5) # 尝试处理弹窗 try: await self.page.keyboard.press('Escape') await asyncio.sleep(1) except: pass # 等待更多数据 await asyncio.sleep(3) first_page_count = len(all_data_list) logger.info(f"第1页完成,获取到 {first_page_count} 条数据") # 如果需要获取更多页数据 if total_pages > 1: for page_num in range(2, total_pages + 1): logger.info(f"正在获取第 {page_num} 页数据...") # 等待页面稳定 await asyncio.sleep(2) # 查找并点击下一页按钮 next_button_found = False next_button_selectors = [ '.search-page-tiny-arrow-right--oXVFaRao', '[class*="search-page-tiny-arrow-right"]', 'button[aria-label="下一页"]', 'button:has-text("下一页")', 'a:has-text("下一页")', '.ant-pagination-next', 'li.ant-pagination-next a', 'a[aria-label="下一页"]' ] for selector in next_button_selectors: try: next_button = self.page.locator(selector).first if await next_button.is_visible(timeout=3000): # 检查按钮是否可点击 is_disabled = await next_button.get_attribute("disabled") has_disabled_class = await next_button.evaluate("el => el.classList.contains('ant-pagination-disabled') || el.classList.contains('disabled')") if not is_disabled and not has_disabled_class: logger.info(f"找到下一页按钮,正在点击到第 {page_num} 页...") # 记录点击前的数据量 before_click_count = len(all_data_list) # 滚动到按钮位置并点击 await next_button.scroll_into_view_if_needed() await asyncio.sleep(1) await next_button.click() await self.page.wait_for_load_state("networkidle", timeout=15000) # 等待新数据加载 await asyncio.sleep(5) # 检查是否有新数据 after_click_count = len(all_data_list) new_items = after_click_count - before_click_count if new_items > 0: logger.info(f"第 {page_num} 页成功,新增 {new_items} 条数据") next_button_found = True break else: logger.warning(f"第 {page_num} 页点击后没有新数据,可能已到最后一页") next_button_found = False break except Exception as e: continue if not next_button_found: logger.warning(f"无法获取第 {page_num} 页数据,停止在第 {page_num-1} 页") break # 根据"人想要"数量进行倒序排列 all_data_list.sort(key=lambda x: x.get('want_count', 0), reverse=True) total_count = len(all_data_list) logger.info(f"多页搜索完成,总共获取到 {total_count} 条真实数据,已按想要人数排序") return { 'items': all_data_list, 'total': total_count, 'is_real_data': True, 'source': 'playwright' } finally: await self.close_browser() except Exception as e: logger.error(f"Playwright 多页搜索失败: {str(e)}") # 如果 Playwright 失败,返回错误信息 return { 'items': [], 'total': 0, 'error': f'多页搜索失败: {str(e)}' } async def _get_multiple_fallback_data(self, keyword: str, total_pages: int) -> Dict[str, Any]: """获取多页备选数据(模拟数据)""" logger.info(f"使用多页备选数据: 关键词='{keyword}', 总页数={total_pages}") # 模拟搜索延迟 await asyncio.sleep(1) # 生成多页模拟数据 all_mock_items = [] for page in range(1, total_pages + 1): page_size = 20 # 每页20条 start_index = (page - 1) * page_size for i in range(page_size): item_index = start_index + i + 1 all_mock_items.append({ 'item_id': f'mock_{keyword}_{item_index}', 'title': f'{keyword}相关商品 #{item_index} [模拟数据-第{page}页]', 'price': f'{100 + item_index * 10}', 'seller_name': f'卖家{item_index}', 'item_url': f'https://www.goofish.com/item?id=mock_{keyword}_{item_index}', 'publish_time': '2025-07-28', 'tags': [f'标签{i+1}', f'分类{i+1}'], 'main_image': f'https://via.placeholder.com/200x200?text={keyword}商品{item_index}', 'raw_data': { 'mock': True, 'keyword': keyword, 'index': item_index, 'page': page } }) total_count = len(all_mock_items) logger.info(f"多页备选数据生成完成: 找到{total_count}个商品,共{total_pages}页") return { 'items': all_mock_items, 'total': total_count, 'is_fallback': True } async def _parse_item_old(self, item_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 解析单个商品数据 Args: item_data: 商品原始数据 Returns: 解析后的商品信息 """ try: # 基本信息 - 适配多种可能的字段名 item_id = (item_data.get('id') or item_data.get('itemId') or item_data.get('item_id') or item_data.get('spuId') or '') title = (item_data.get('title') or item_data.get('name') or item_data.get('itemTitle') or item_data.get('subject') or '') # 价格处理 price_raw = (item_data.get('price') or item_data.get('priceText') or item_data.get('currentPrice') or item_data.get('realPrice') or '') # 清理价格格式 if isinstance(price_raw, (int, float)): price = str(price_raw) elif isinstance(price_raw, str): price = price_raw.replace('¥', '').replace('元', '').strip() else: price = '价格面议' # 卖家信息 seller_info = (item_data.get('seller') or item_data.get('user') or item_data.get('owner') or {}) seller_name = '' if seller_info: seller_name = (seller_info.get('nick') or seller_info.get('nickname') or seller_info.get('name') or seller_info.get('userName') or '匿名用户') # 商品链接 if item_id: item_url = f"https://www.goofish.com/item?id={item_id}" else: item_url = item_data.get('url', item_data.get('link', '')) # 发布时间 publish_time = (item_data.get('publishTime') or item_data.get('createTime') or item_data.get('gmtCreate') or item_data.get('time') or '') # 格式化时间 if publish_time and isinstance(publish_time, (int, float)): import datetime publish_time = datetime.datetime.fromtimestamp(publish_time / 1000).strftime('%Y-%m-%d') # 商品标签 tags = item_data.get('tags', item_data.get('labels', [])) tag_names = [] if isinstance(tags, list): for tag in tags: if isinstance(tag, dict): tag_name = (tag.get('name') or tag.get('text') or tag.get('label') or '') if tag_name: tag_names.append(tag_name) elif isinstance(tag, str): tag_names.append(tag) # 图片 images = (item_data.get('images') or item_data.get('pics') or item_data.get('pictures') or item_data.get('imgList') or []) main_image = '' if images and len(images) > 0: if isinstance(images[0], str): main_image = images[0] elif isinstance(images[0], dict): main_image = (images[0].get('url') or images[0].get('src') or images[0].get('image') or '') # 如果没有图片,使用默认占位图 if not main_image: main_image = f'https://via.placeholder.com/200x200?text={title[:10] if title else "商品"}...' return { 'item_id': item_id, 'title': title, 'price': price, 'seller_name': seller_name, 'item_url': item_url, 'publish_time': publish_time, 'tags': tag_names, 'main_image': main_image, 'raw_data': item_data # 保留原始数据以备后用 } except Exception as e: logger.warning(f"解析商品数据失败: {str(e)}") return None # 全局搜索器实例 _searcher = None async def search_xianyu_items(keyword: str, page: int = 1, page_size: int = 20) -> Dict[str, Any]: """ 搜索闲鱼商品的便捷函数 Args: keyword: 搜索关键词 page: 页码 page_size: 每页数量 Returns: 搜索结果 """ global _searcher if not _searcher: _searcher = XianyuSearcher() try: return await _searcher.search_items(keyword, page, page_size) except Exception as e: logger.error(f"搜索商品失败: {str(e)}") return { 'items': [], 'total': 0, 'error': str(e) } async def search_multiple_pages_xianyu(keyword: str, total_pages: int = 1) -> Dict[str, Any]: """ 搜索多页闲鱼商品的便捷函数 Args: keyword: 搜索关键词 total_pages: 总页数 Returns: 搜索结果 """ global _searcher if not _searcher: _searcher = XianyuSearcher() try: return await _searcher.search_multiple_pages(keyword, total_pages) except Exception as e: logger.error(f"多页搜索商品失败: {str(e)}") return { 'items': [], 'total': 0, 'error': str(e) } async def close_searcher(): """关闭搜索器""" global _searcher if _searcher: await _searcher.close_session() _searcher = None async def get_item_detail_from_api(item_id: str) -> Optional[str]: """ 从外部API获取商品详情 Args: item_id: 商品ID Returns: 商品详情文本,获取失败返回None """ try: # 使用默认的API配置 api_base_url = 'https://selfapi.zhinianboke.com/api/getItemDetail' timeout_seconds = 10 api_url = f"{api_base_url}/{item_id}" logger.info(f"正在从外部API获取商品详情: {item_id}") # 使用简单的HTTP请求 import aiohttp timeout = aiohttp.ClientTimeout(total=timeout_seconds) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(api_url) as response: if response.status == 200: result = await response.json() # 检查返回状态 if result.get('status') == '200' and result.get('data'): item_detail = result['data'] logger.info(f"成功获取商品详情: {item_id}, 长度: {len(item_detail)}") return item_detail else: logger.warning(f"API返回状态异常: {result.get('status')}, message: {result.get('message')}") return None else: logger.warning(f"API请求失败: HTTP {response.status}") return None except asyncio.TimeoutError: logger.warning(f"获取商品详情超时: {item_id}") return None except Exception as e: logger.error(f"获取商品详情异常: {item_id}, 错误: {str(e)}") return None