xianyu-auto-reply/utils/item_search.py
2025-08-01 13:10:19 +08:00

954 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
闲鱼商品搜索模块
基于 Playwright 实现真实的闲鱼商品搜索功能
"""
import asyncio
import json
import time
import sys
import os
from datetime import datetime
from typing import Dict, List, Any, Optional
from loguru import logger
# 修复Docker环境中的asyncio事件循环策略问题
if sys.platform.startswith('linux') or os.getenv('DOCKER_ENV'):
try:
# 在Linux/Docker环境中设置事件循环策略
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
except Exception as e:
logger.warning(f"设置事件循环策略失败: {e}")
# 确保在Docker环境中使用正确的事件循环
if os.getenv('DOCKER_ENV'):
try:
# 强制使用SelectorEventLoop在Docker中更稳定
if hasattr(asyncio, 'SelectorEventLoop'):
loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(loop)
except Exception as e:
logger.warning(f"设置SelectorEventLoop失败: {e}")
try:
from playwright.async_api import async_playwright
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
logger.warning("Playwright 未安装,将使用模拟数据")
class XianyuSearcher:
"""闲鱼商品搜索器 - 基于 Playwright"""
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.api_responses = []
async def safe_get(self, data, *keys, default="暂无"):
"""安全获取嵌套字典值"""
for key in keys:
try:
data = data[key]
except (KeyError, TypeError, IndexError):
return default
return data
async def init_browser(self):
"""初始化浏览器"""
if not PLAYWRIGHT_AVAILABLE:
raise Exception("Playwright 未安装,无法使用真实搜索功能")
if not self.browser:
playwright = await async_playwright().start()
logger.info("正在启动浏览器...")
# Docker环境优化的浏览器启动参数
browser_args = [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding',
'--disable-features=TranslateUI',
'--disable-ipc-flooding-protection',
'--disable-extensions',
'--disable-default-apps',
'--disable-sync',
'--disable-translate',
'--hide-scrollbars',
'--mute-audio',
'--no-default-browser-check',
'--no-pings',
'--single-process' # 在Docker中使用单进程模式
]
# 在Docker环境中添加额外参数
if os.getenv('DOCKER_ENV'):
browser_args.extend([
'--disable-background-networking',
'--disable-background-timer-throttling',
'--disable-client-side-phishing-detection',
'--disable-default-apps',
'--disable-hang-monitor',
'--disable-popup-blocking',
'--disable-prompt-on-repost',
'--disable-sync',
'--disable-web-resources',
'--metrics-recording-only',
'--no-first-run',
'--safebrowsing-disable-auto-update',
'--enable-automation',
'--password-store=basic',
'--use-mock-keychain'
])
self.browser = await playwright.chromium.launch(
headless=True, # 无头模式
args=browser_args
)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
self.page = await self.context.new_page()
async def close_browser(self):
"""关闭浏览器"""
if self.browser:
await self.browser.close()
self.browser = None
self.context = None
self.page = None
async def search_items(self, keyword: str, page: int = 1, page_size: int = 20) -> Dict[str, Any]:
"""
搜索闲鱼商品 - 使用 Playwright 获取真实数据
Args:
keyword: 搜索关键词
page: 页码从1开始
page_size: 每页数量
Returns:
搜索结果字典包含items列表和总数
"""
try:
if not PLAYWRIGHT_AVAILABLE:
logger.error("Playwright 不可用,无法获取真实数据")
return {
'items': [],
'total': 0,
'error': 'Playwright 不可用,无法获取真实数据'
}
logger.info(f"使用 Playwright 搜索闲鱼商品: 关键词='{keyword}', 页码={page}, 每页={page_size}")
await self.init_browser()
# 清空之前的API响应
self.api_responses = []
data_list = []
# 设置API响应监听器
async def on_response(response):
"""处理API响应解析数据"""
if "h5api.m.goofish.com/h5/mtop.taobao.idlemtopsearch.pc.search" in response.url:
try:
# 检查响应状态
if response.status != 200:
logger.warning(f"API响应状态异常: {response.status}")
return
# 安全地获取响应内容
try:
result_json = await response.json()
except Exception as json_error:
logger.warning(f"无法解析响应JSON: {str(json_error)}")
return
self.api_responses.append(result_json)
logger.info(f"捕获到API响应URL: {response.url}")
items = result_json.get("data", {}).get("resultList", [])
logger.info(f"从API获取到 {len(items)} 条原始数据")
for item in items:
try:
parsed_item = await self._parse_real_item(item)
if parsed_item:
data_list.append(parsed_item)
except Exception as parse_error:
logger.warning(f"解析单个商品失败: {str(parse_error)}")
continue
except Exception as e:
logger.warning(f"响应处理异常: {str(e)}")
try:
logger.info("正在访问闲鱼首页...")
await self.page.goto("https://www.goofish.com", timeout=30000)
await self.page.wait_for_load_state("networkidle", timeout=10000)
logger.info(f"正在搜索关键词: {keyword}")
await self.page.fill('input[class*="search-input"]', keyword)
# 注册响应监听
self.page.on("response", on_response)
await self.page.click('button[type="submit"]')
await self.page.wait_for_load_state("networkidle", timeout=15000)
# 等待第一页API响应
logger.info("等待第一页API响应...")
await asyncio.sleep(5)
# 尝试处理弹窗
try:
await self.page.keyboard.press('Escape')
await asyncio.sleep(1)
except:
pass
# 等待更多数据
await asyncio.sleep(3)
first_page_count = len(data_list)
logger.info(f"第1页完成获取到 {first_page_count} 条数据")
# 如果需要获取指定页数据,实现翻页逻辑
if page > 1:
# 清空之前的数据,只保留目标页的数据
data_list.clear()
await self._navigate_to_page(page)
# 根据"人想要"数量进行倒序排列
data_list.sort(key=lambda x: x.get('want_count', 0), reverse=True)
total_count = len(data_list)
logger.info(f"搜索完成,总共获取到 {total_count} 条真实数据,已按想要人数排序")
return {
'items': data_list,
'total': total_count,
'is_real_data': True,
'source': 'playwright'
}
finally:
await self.close_browser()
except Exception as e:
error_msg = str(e)
logger.error(f"Playwright 搜索失败: {error_msg}")
# 检查是否是浏览器安装问题
if "Executable doesn't exist" in error_msg or "playwright install" in error_msg:
error_msg = "浏览器未安装。请在Docker容器中运行: playwright install chromium"
elif "BrowserType.launch" in error_msg:
error_msg = "浏览器启动失败。请确保Docker容器有足够的权限和资源"
# 如果 Playwright 失败,返回错误信息
return {
'items': [],
'total': 0,
'error': f'搜索失败: {error_msg}'
}
async def _get_fallback_data(self, keyword: str, page: int, page_size: int) -> Dict[str, Any]:
"""获取备选数据(模拟数据)"""
logger.info(f"使用备选数据: 关键词='{keyword}', 页码={page}, 每页={page_size}")
# 模拟搜索延迟
await asyncio.sleep(0.5)
# 生成模拟数据
mock_items = []
start_index = (page - 1) * page_size
for i in range(page_size):
item_index = start_index + i + 1
mock_items.append({
'item_id': f'mock_{keyword}_{item_index}',
'title': f'{keyword}相关商品 #{item_index} [模拟数据]',
'price': f'{100 + item_index * 10}',
'seller_name': f'卖家{item_index}',
'item_url': f'https://www.goofish.com/item?id=mock_{keyword}_{item_index}',
'publish_time': '2025-07-28',
'tags': [f'标签{i+1}', f'分类{i+1}'],
'main_image': f'https://via.placeholder.com/200x200?text={keyword}商品{item_index}',
'raw_data': {
'mock': True,
'keyword': keyword,
'index': item_index
}
})
# 模拟总数
total_items = 100 + hash(keyword) % 500
logger.info(f"备选数据生成完成: 找到{len(mock_items)}个商品,总计{total_items}")
return {
'items': mock_items,
'total': total_items,
'is_fallback': True
}
async def _parse_real_item(self, item_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""解析真实的闲鱼商品数据"""
try:
main_data = await self.safe_get(item_data, "data", "item", "main", "exContent", default={})
click_params = await self.safe_get(item_data, "data", "item", "main", "clickParam", "args", default={})
# 解析商品信息
title = await self.safe_get(main_data, "title", default="未知标题")
# 价格处理
price_parts = await self.safe_get(main_data, "price", default=[])
price = "价格异常"
if isinstance(price_parts, list):
price = "".join([str(p.get("text", "")) for p in price_parts if isinstance(p, dict)])
price = price.replace("当前价", "").strip()
# 统一价格格式处理
if price and price != "价格异常":
# 先移除所有¥符号,避免重复
clean_price = price.replace('¥', '').strip()
# 处理万单位的价格
if "" in clean_price:
try:
numeric_price = clean_price.replace('', '').strip()
price_value = float(numeric_price) * 10000
price = f"¥{price_value:.0f}"
except:
price = f"¥{clean_price}" # 如果转换失败,保持原样但确保有¥符号
else:
# 普通价格,确保有¥符号
if clean_price and (clean_price[0].isdigit() or clean_price.replace('.', '').isdigit()):
price = f"¥{clean_price}"
else:
price = clean_price if clean_price else "价格异常"
# 只提取"想要人数"标签
fish_tags_content = ""
fish_tags = await self.safe_get(main_data, "fishTags", default={})
# 遍历所有类型的标签 (r2, r3, r4等)
for tag_type, tag_data in fish_tags.items():
if isinstance(tag_data, dict) and "tagList" in tag_data:
tag_list = tag_data.get("tagList", [])
for tag_item in tag_list:
if isinstance(tag_item, dict) and "data" in tag_item:
content = tag_item["data"].get("content", "")
# 只保留包含"人想要"的标签
if content and "人想要" in content:
fish_tags_content = content
break
if fish_tags_content: # 找到后就退出
break
# 其他字段解析
area = await self.safe_get(main_data, "area", default="地区未知")
seller = await self.safe_get(main_data, "userNickName", default="匿名卖家")
raw_link = await self.safe_get(item_data, "data", "item", "main", "targetUrl", default="")
image_url = await self.safe_get(main_data, "picUrl", default="")
# 获取商品ID
item_id = await self.safe_get(click_params, "item_id", default="未知ID")
# 处理发布时间
publish_time = "未知时间"
publish_timestamp = click_params.get("publishTime", "")
if publish_timestamp and publish_timestamp.isdigit():
try:
publish_time = datetime.fromtimestamp(
int(publish_timestamp)/1000
).strftime("%Y-%m-%d %H:%M")
except:
pass
# 提取"人想要"的数字用于排序
want_count = self._extract_want_count(fish_tags_content)
return {
"item_id": item_id,
"title": title,
"price": price,
"seller_name": seller,
"item_url": raw_link.replace("fleamarket://", "https://www.goofish.com/"),
"main_image": f"https:{image_url}" if image_url and not image_url.startswith("http") else image_url,
"publish_time": publish_time,
"tags": [fish_tags_content] if fish_tags_content else [],
"area": area,
"want_count": want_count, # 添加想要人数用于排序
"raw_data": item_data
}
except Exception as e:
logger.warning(f"解析真实商品数据失败: {str(e)}")
return None
def _extract_want_count(self, tags_content: str) -> int:
"""从标签内容中提取"人想要"的数字"""
try:
if not tags_content or "人想要" not in tags_content:
return 0
# 使用正则表达式提取数字
import re
# 匹配类似 "123人想要" 或 "1.2万人想要" 的格式
pattern = r'(\d+(?:\.\d+)?(?:万)?)\s*人想要'
match = re.search(pattern, tags_content)
if match:
number_str = match.group(1)
if '' in number_str:
# 处理万单位
number = float(number_str.replace('', '')) * 10000
return int(number)
else:
return int(float(number_str))
return 0
except Exception as e:
logger.warning(f"提取想要人数失败: {str(e)}")
return 0
async def _navigate_to_page(self, target_page: int):
"""导航到指定页面"""
try:
logger.info(f"正在导航到第 {target_page} 页...")
# 等待页面稳定
await asyncio.sleep(2)
# 查找并点击下一页按钮
next_button_selectors = [
'.search-page-tiny-arrow-right--oXVFaRao', # 用户找到的正确选择器
'[class*="search-page-tiny-arrow-right"]', # 更通用的版本
'button[aria-label="下一页"]',
'button:has-text("下一页")',
'a:has-text("下一页")',
'.ant-pagination-next',
'li.ant-pagination-next a',
'a[aria-label="下一页"]',
'[class*="next"]',
'[class*="pagination-next"]',
'button[title="下一页"]',
'a[title="下一页"]'
]
# 从第2页开始点击
for current_page in range(2, target_page + 1):
logger.info(f"正在点击到第 {current_page} 页...")
next_button_found = False
for selector in next_button_selectors:
try:
next_button = self.page.locator(selector).first
if await next_button.is_visible(timeout=3000):
# 检查按钮是否可点击(不是禁用状态)
is_disabled = await next_button.get_attribute("disabled")
has_disabled_class = await next_button.evaluate("el => el.classList.contains('ant-pagination-disabled') || el.classList.contains('disabled')")
if not is_disabled and not has_disabled_class:
logger.info(f"找到下一页按钮,正在点击...")
# 滚动到按钮位置
await next_button.scroll_into_view_if_needed()
await asyncio.sleep(1)
# 点击下一页
await next_button.click()
await self.page.wait_for_load_state("networkidle", timeout=15000)
# 等待新数据加载
await asyncio.sleep(5)
logger.info(f"成功导航到第 {current_page}")
next_button_found = True
break
except Exception as e:
continue
if not next_button_found:
logger.warning(f"无法找到下一页按钮,停止在第 {current_page-1}")
break
except Exception as e:
logger.error(f"导航到第 {target_page} 页失败: {str(e)}")
async def search_multiple_pages(self, keyword: str, total_pages: int = 1) -> Dict[str, Any]:
"""
搜索多页闲鱼商品
Args:
keyword: 搜索关键词
total_pages: 总页数
Returns:
搜索结果字典包含所有页面的items列表和总数
"""
try:
if not PLAYWRIGHT_AVAILABLE:
logger.error("Playwright 不可用,无法获取真实数据")
return {
'items': [],
'total': 0,
'error': 'Playwright 不可用,无法获取真实数据'
}
logger.info(f"使用 Playwright 搜索多页闲鱼商品: 关键词='{keyword}', 总页数={total_pages}")
await self.init_browser()
# 清空之前的API响应
self.api_responses = []
all_data_list = []
# 设置API响应监听器
async def on_response(response):
"""处理API响应解析数据"""
if "h5api.m.goofish.com/h5/mtop.taobao.idlemtopsearch.pc.search" in response.url:
try:
# 检查响应状态
if response.status != 200:
logger.warning(f"API响应状态异常: {response.status}")
return
# 安全地获取响应内容
try:
result_json = await response.json()
except Exception as json_error:
logger.warning(f"无法解析响应JSON: {str(json_error)}")
return
self.api_responses.append(result_json)
logger.info(f"捕获到API响应URL: {response.url}")
items = result_json.get("data", {}).get("resultList", [])
logger.info(f"从API获取到 {len(items)} 条原始数据")
for item in items:
try:
parsed_item = await self._parse_real_item(item)
if parsed_item:
all_data_list.append(parsed_item)
except Exception as parse_error:
logger.warning(f"解析单个商品失败: {str(parse_error)}")
continue
except Exception as e:
logger.warning(f"响应处理异常: {str(e)}")
try:
logger.info("正在访问闲鱼首页...")
await self.page.goto("https://www.goofish.com", timeout=30000)
await self.page.wait_for_load_state("networkidle", timeout=10000)
logger.info(f"正在搜索关键词: {keyword}")
await self.page.fill('input[class*="search-input"]', keyword)
# 注册响应监听
self.page.on("response", on_response)
await self.page.click('button[type="submit"]')
await self.page.wait_for_load_state("networkidle", timeout=15000)
# 等待第一页API响应
logger.info("等待第一页API响应...")
await asyncio.sleep(5)
# 尝试处理弹窗
try:
await self.page.keyboard.press('Escape')
await asyncio.sleep(1)
except:
pass
# 等待更多数据
await asyncio.sleep(3)
first_page_count = len(all_data_list)
logger.info(f"第1页完成获取到 {first_page_count} 条数据")
# 如果需要获取更多页数据
if total_pages > 1:
for page_num in range(2, total_pages + 1):
logger.info(f"正在获取第 {page_num} 页数据...")
# 等待页面稳定
await asyncio.sleep(2)
# 查找并点击下一页按钮
next_button_found = False
next_button_selectors = [
'.search-page-tiny-arrow-right--oXVFaRao',
'[class*="search-page-tiny-arrow-right"]',
'button[aria-label="下一页"]',
'button:has-text("下一页")',
'a:has-text("下一页")',
'.ant-pagination-next',
'li.ant-pagination-next a',
'a[aria-label="下一页"]'
]
for selector in next_button_selectors:
try:
next_button = self.page.locator(selector).first
if await next_button.is_visible(timeout=3000):
# 检查按钮是否可点击
is_disabled = await next_button.get_attribute("disabled")
has_disabled_class = await next_button.evaluate("el => el.classList.contains('ant-pagination-disabled') || el.classList.contains('disabled')")
if not is_disabled and not has_disabled_class:
logger.info(f"找到下一页按钮,正在点击到第 {page_num} 页...")
# 记录点击前的数据量
before_click_count = len(all_data_list)
# 滚动到按钮位置并点击
await next_button.scroll_into_view_if_needed()
await asyncio.sleep(1)
await next_button.click()
await self.page.wait_for_load_state("networkidle", timeout=15000)
# 等待新数据加载
await asyncio.sleep(5)
# 检查是否有新数据
after_click_count = len(all_data_list)
new_items = after_click_count - before_click_count
if new_items > 0:
logger.info(f"{page_num} 页成功,新增 {new_items} 条数据")
next_button_found = True
break
else:
logger.warning(f"{page_num} 页点击后没有新数据,可能已到最后一页")
next_button_found = False
break
except Exception as e:
continue
if not next_button_found:
logger.warning(f"无法获取第 {page_num} 页数据,停止在第 {page_num-1}")
break
# 根据"人想要"数量进行倒序排列
all_data_list.sort(key=lambda x: x.get('want_count', 0), reverse=True)
total_count = len(all_data_list)
logger.info(f"多页搜索完成,总共获取到 {total_count} 条真实数据,已按想要人数排序")
return {
'items': all_data_list,
'total': total_count,
'is_real_data': True,
'source': 'playwright'
}
finally:
await self.close_browser()
except Exception as e:
error_msg = str(e)
logger.error(f"Playwright 多页搜索失败: {error_msg}")
# 检查是否是浏览器安装问题
if "Executable doesn't exist" in error_msg or "playwright install" in error_msg:
error_msg = "浏览器未安装。请在Docker容器中运行: playwright install chromium"
elif "BrowserType.launch" in error_msg:
error_msg = "浏览器启动失败。请确保Docker容器有足够的权限和资源"
# 如果 Playwright 失败,返回错误信息
return {
'items': [],
'total': 0,
'error': f'多页搜索失败: {error_msg}'
}
async def _get_multiple_fallback_data(self, keyword: str, total_pages: int) -> Dict[str, Any]:
"""获取多页备选数据(模拟数据)"""
logger.info(f"使用多页备选数据: 关键词='{keyword}', 总页数={total_pages}")
# 模拟搜索延迟
await asyncio.sleep(1)
# 生成多页模拟数据
all_mock_items = []
for page in range(1, total_pages + 1):
page_size = 20 # 每页20条
start_index = (page - 1) * page_size
for i in range(page_size):
item_index = start_index + i + 1
all_mock_items.append({
'item_id': f'mock_{keyword}_{item_index}',
'title': f'{keyword}相关商品 #{item_index} [模拟数据-第{page}页]',
'price': f'{100 + item_index * 10}',
'seller_name': f'卖家{item_index}',
'item_url': f'https://www.goofish.com/item?id=mock_{keyword}_{item_index}',
'publish_time': '2025-07-28',
'tags': [f'标签{i+1}', f'分类{i+1}'],
'main_image': f'https://via.placeholder.com/200x200?text={keyword}商品{item_index}',
'raw_data': {
'mock': True,
'keyword': keyword,
'index': item_index,
'page': page
}
})
total_count = len(all_mock_items)
logger.info(f"多页备选数据生成完成: 找到{total_count}个商品,共{total_pages}")
return {
'items': all_mock_items,
'total': total_count,
'is_fallback': True
}
async def _parse_item_old(self, item_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
解析单个商品数据
Args:
item_data: 商品原始数据
Returns:
解析后的商品信息
"""
try:
# 基本信息 - 适配多种可能的字段名
item_id = (item_data.get('id') or
item_data.get('itemId') or
item_data.get('item_id') or
item_data.get('spuId') or '')
title = (item_data.get('title') or
item_data.get('name') or
item_data.get('itemTitle') or
item_data.get('subject') or '')
# 价格处理
price_raw = (item_data.get('price') or
item_data.get('priceText') or
item_data.get('currentPrice') or
item_data.get('realPrice') or '')
# 清理价格格式
if isinstance(price_raw, (int, float)):
price = str(price_raw)
elif isinstance(price_raw, str):
price = price_raw.replace('¥', '').replace('', '').strip()
else:
price = '价格面议'
# 卖家信息
seller_info = (item_data.get('seller') or
item_data.get('user') or
item_data.get('owner') or {})
seller_name = ''
if seller_info:
seller_name = (seller_info.get('nick') or
seller_info.get('nickname') or
seller_info.get('name') or
seller_info.get('userName') or '匿名用户')
# 商品链接
if item_id:
item_url = f"https://www.goofish.com/item?id={item_id}"
else:
item_url = item_data.get('url', item_data.get('link', ''))
# 发布时间
publish_time = (item_data.get('publishTime') or
item_data.get('createTime') or
item_data.get('gmtCreate') or
item_data.get('time') or '')
# 格式化时间
if publish_time and isinstance(publish_time, (int, float)):
import datetime
publish_time = datetime.datetime.fromtimestamp(publish_time / 1000).strftime('%Y-%m-%d')
# 商品标签
tags = item_data.get('tags', item_data.get('labels', []))
tag_names = []
if isinstance(tags, list):
for tag in tags:
if isinstance(tag, dict):
tag_name = (tag.get('name') or
tag.get('text') or
tag.get('label') or '')
if tag_name:
tag_names.append(tag_name)
elif isinstance(tag, str):
tag_names.append(tag)
# 图片
images = (item_data.get('images') or
item_data.get('pics') or
item_data.get('pictures') or
item_data.get('imgList') or [])
main_image = ''
if images and len(images) > 0:
if isinstance(images[0], str):
main_image = images[0]
elif isinstance(images[0], dict):
main_image = (images[0].get('url') or
images[0].get('src') or
images[0].get('image') or '')
# 如果没有图片,使用默认占位图
if not main_image:
main_image = f'https://via.placeholder.com/200x200?text={title[:10] if title else "商品"}...'
return {
'item_id': item_id,
'title': title,
'price': price,
'seller_name': seller_name,
'item_url': item_url,
'publish_time': publish_time,
'tags': tag_names,
'main_image': main_image,
'raw_data': item_data # 保留原始数据以备后用
}
except Exception as e:
logger.warning(f"解析商品数据失败: {str(e)}")
return None
# 全局搜索器实例
_searcher = None
async def search_xianyu_items(keyword: str, page: int = 1, page_size: int = 20) -> Dict[str, Any]:
"""
搜索闲鱼商品的便捷函数
Args:
keyword: 搜索关键词
page: 页码
page_size: 每页数量
Returns:
搜索结果
"""
global _searcher
if not _searcher:
_searcher = XianyuSearcher()
try:
return await _searcher.search_items(keyword, page, page_size)
except Exception as e:
logger.error(f"搜索商品失败: {str(e)}")
return {
'items': [],
'total': 0,
'error': str(e)
}
async def search_multiple_pages_xianyu(keyword: str, total_pages: int = 1) -> Dict[str, Any]:
"""
搜索多页闲鱼商品的便捷函数
Args:
keyword: 搜索关键词
total_pages: 总页数
Returns:
搜索结果
"""
global _searcher
if not _searcher:
_searcher = XianyuSearcher()
try:
return await _searcher.search_multiple_pages(keyword, total_pages)
except Exception as e:
logger.error(f"多页搜索商品失败: {str(e)}")
return {
'items': [],
'total': 0,
'error': str(e)
}
async def close_searcher():
"""关闭搜索器"""
global _searcher
if _searcher:
await _searcher.close_session()
_searcher = None
async def get_item_detail_from_api(item_id: str) -> Optional[str]:
"""
从外部API获取商品详情
Args:
item_id: 商品ID
Returns:
商品详情文本获取失败返回None
"""
try:
# 使用默认的API配置
api_base_url = 'https://selfapi.zhinianboke.com/api/getItemDetail'
timeout_seconds = 10
api_url = f"{api_base_url}/{item_id}"
logger.info(f"正在从外部API获取商品详情: {item_id}")
# 使用简单的HTTP请求
import aiohttp
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(api_url) as response:
if response.status == 200:
result = await response.json()
# 检查返回状态
if result.get('status') == '200' and result.get('data'):
item_detail = result['data']
logger.info(f"成功获取商品详情: {item_id}, 长度: {len(item_detail)}")
return item_detail
else:
logger.warning(f"API返回状态异常: {result.get('status')}, message: {result.get('message')}")
return None
else:
logger.warning(f"API请求失败: HTTP {response.status}")
return None
except asyncio.TimeoutError:
logger.warning(f"获取商品详情超时: {item_id}")
return None
except Exception as e:
logger.error(f"获取商品详情异常: {item_id}, 错误: {str(e)}")
return None