import asyncio import json import time import base64 import os from loguru import logger import websockets from utils.xianyu_utils import ( decrypt, generate_mid, generate_uuid, trans_cookies, generate_device_id, generate_sign ) from config import ( WEBSOCKET_URL, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, TOKEN_REFRESH_INTERVAL, TOKEN_RETRY_INTERVAL, config, COOKIES_STR, LOG_CONFIG, AUTO_REPLY, DEFAULT_HEADERS, WEBSOCKET_HEADERS, APP_CONFIG, API_ENDPOINTS ) from utils.message_utils import format_message, format_system_message from utils.ws_utils import WebSocketClient import sys import aiohttp # 日志配置 log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, f"xianyu_{time.strftime('%Y-%m-%d')}.log") logger.remove() logger.add( log_path, rotation=LOG_CONFIG.get('rotation', '1 day'), retention=LOG_CONFIG.get('retention', '7 days'), compression=LOG_CONFIG.get('compression', 'zip'), level=LOG_CONFIG.get('level', 'INFO'), format=LOG_CONFIG.get('format', '{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}'), encoding='utf-8', enqueue=True ) logger.add( sys.stdout, level=LOG_CONFIG.get('level', 'INFO'), format=LOG_CONFIG.get('format', '{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}'), enqueue=True ) class XianyuLive: def _safe_str(self, e): """安全地将异常转换为字符串""" try: return str(e) except: try: return repr(e) except: return "未知错误" def __init__(self, cookies_str=None, cookie_id: str = "default", user_id: int = None): """初始化闲鱼直播类""" if not cookies_str: cookies_str = COOKIES_STR if not cookies_str: raise ValueError("未提供cookies,请在global_config.yml中配置COOKIES_STR或通过参数传入") self.cookies = trans_cookies(cookies_str) self.cookie_id = cookie_id # 唯一账号标识 self.cookies_str = cookies_str # 保存原始cookie字符串 self.user_id = user_id # 保存用户ID,用于token刷新时保持正确的所有者关系 self.base_url = WEBSOCKET_URL self.myid = self.cookies['unb'] self.device_id = generate_device_id(self.myid) # 心跳相关配置 self.heartbeat_interval = HEARTBEAT_INTERVAL self.heartbeat_timeout = HEARTBEAT_TIMEOUT self.last_heartbeat_time = 0 self.last_heartbeat_response = 0 self.heartbeat_task = None self.ws = None # Token刷新相关配置 self.token_refresh_interval = TOKEN_REFRESH_INTERVAL self.token_retry_interval = TOKEN_RETRY_INTERVAL self.last_token_refresh_time = 0 self.current_token = None self.token_refresh_task = None self.connection_restart_flag = False # 连接重启标志 # 通知防重复机制 self.last_notification_time = {} # 记录每种通知类型的最后发送时间 self.notification_cooldown = 300 # 5分钟内不重复发送相同类型的通知 # 自动发货防重复机制 self.last_delivery_time = {} # 记录每个商品的最后发货时间 self.delivery_cooldown = 60 # 1分钟内不重复发货 # 人工接管功能已禁用,永远走自动模式 # self.manual_mode_conversations = set() # 存储处于人工接管模式的会话ID # self.manual_mode_timeout = MANUAL_MODE.get('timeout', 3600) # 人工接管超时时间,默认1小时 # self.manual_mode_timestamps = {} # 记录进入人工模式的时间 # self.toggle_keywords = MANUAL_MODE.get('toggle_keywords', ['。']) # 切换关键词 self.session = None # 用于API调用的aiohttp session def can_auto_delivery(self, item_id: str) -> bool: """检查是否可以进行自动发货(防重复发货)""" current_time = time.time() last_delivery = self.last_delivery_time.get(item_id, 0) if current_time - last_delivery < self.delivery_cooldown: logger.info(f"【{self.cookie_id}】商品 {item_id} 在冷却期内,跳过自动发货") return False return True def mark_delivery_sent(self, item_id: str): """标记商品已发货""" self.last_delivery_time[item_id] = time.time() logger.debug(f"【{self.cookie_id}】标记商品 {item_id} 已发货") # 人工接管功能已禁用,以下方法不再使用 # def check_toggle_keywords(self, message): # """检查消息是否包含切换关键词""" # return any(keyword in message for keyword in self.toggle_keywords) # def is_manual_mode(self, chat_id): # """检查是否处于人工接管模式""" # if chat_id in self.manual_mode_conversations: # # 检查是否超时 # if time.time() - self.manual_mode_timestamps.get(chat_id, 0) > self.manual_mode_timeout: # self.exit_manual_mode(chat_id) # return False # return True # return False # def enter_manual_mode(self, chat_id): # """进入人工接管模式""" # self.manual_mode_conversations.add(chat_id) # self.manual_mode_timestamps[chat_id] = time.time() # def exit_manual_mode(self, chat_id): # """退出人工接管模式""" # self.manual_mode_conversations.discard(chat_id) # self.manual_mode_timestamps.pop(chat_id, None) # def toggle_manual_mode(self, chat_id): # """切换人工接管模式""" # if self.is_manual_mode(chat_id): # self.exit_manual_mode(chat_id) # return False # else: # self.enter_manual_mode(chat_id) # return True async def refresh_token(self): """刷新token""" try: logger.info(f"【{self.cookie_id}】开始刷新token...") params = { 'jsv': '2.7.2', 'appKey': '34839810', 't': str(int(time.time()) * 1000), 'sign': '', 'v': '1.0', 'type': 'originaljson', 'accountSite': 'xianyu', 'dataType': 'json', 'timeout': '20000', 'api': 'mtop.taobao.idlemessage.pc.login.token', 'sessionOption': 'AutoLoginOnly', 'spm_cnt': 'a21ybx.im.0.0', } data_val = '{"appKey":"444e9908a51d1cb236a27862abc769c9","deviceId":"' + self.device_id + '"}' data = { 'data': data_val, } # 获取token token = None if '_m_h5_tk' in self.cookies: token = self.cookies['_m_h5_tk'].split('_')[0] sign = generate_sign(params['t'], token, data_val) params['sign'] = sign # 发送请求 headers = DEFAULT_HEADERS.copy() headers['cookie'] = self.cookies_str async with aiohttp.ClientSession() as session: async with session.post( API_ENDPOINTS.get('token'), params=params, data=data, headers=headers ) as response: res_json = await response.json() # 检查并更新Cookie if 'set-cookie' in response.headers: new_cookies = {} for cookie in response.headers.getall('set-cookie', []): if '=' in cookie: name, value = cookie.split(';')[0].split('=', 1) new_cookies[name.strip()] = value.strip() # 更新cookies if new_cookies: self.cookies.update(new_cookies) # 生成新的cookie字符串 self.cookies_str = '; '.join([f"{k}={v}" for k, v in self.cookies.items()]) # 更新数据库中的Cookie await self.update_config_cookies() logger.debug("已更新Cookie到数据库") if isinstance(res_json, dict): ret_value = res_json.get('ret', []) # 检查ret是否包含成功信息 if any('SUCCESS::调用成功' in ret for ret in ret_value): if 'data' in res_json and 'accessToken' in res_json['data']: new_token = res_json['data']['accessToken'] self.current_token = new_token self.last_token_refresh_time = time.time() logger.info(f"【{self.cookie_id}】Token刷新成功") return new_token logger.error(f"【{self.cookie_id}】Token刷新失败: {res_json}") # 发送Token刷新失败通知 await self.send_token_refresh_notification(f"Token刷新失败: {res_json}", "token_refresh_failed") return None except Exception as e: logger.error(f"Token刷新异常: {self._safe_str(e)}") # 发送Token刷新异常通知 await self.send_token_refresh_notification(f"Token刷新异常: {str(e)}", "token_refresh_exception") return None async def update_config_cookies(self): """更新数据库中的cookies""" try: from db_manager import db_manager # 更新数据库中的Cookie if hasattr(self, 'cookie_id') and self.cookie_id: try: # 获取当前Cookie的用户ID,避免在刷新时改变所有者 current_user_id = None if hasattr(self, 'user_id') and self.user_id: current_user_id = self.user_id db_manager.save_cookie(self.cookie_id, self.cookies_str, current_user_id) logger.debug(f"已更新Cookie到数据库: {self.cookie_id}") except Exception as e: logger.error(f"更新数据库Cookie失败: {self._safe_str(e)}") # 发送数据库更新失败通知 await self.send_token_refresh_notification(f"数据库Cookie更新失败: {str(e)}", "db_update_failed") else: logger.warning("Cookie ID不存在,无法更新数据库") # 发送Cookie ID缺失通知 await self.send_token_refresh_notification("Cookie ID不存在,无法更新数据库", "cookie_id_missing") except Exception as e: logger.error(f"更新Cookie失败: {self._safe_str(e)}") # 发送Cookie更新失败通知 await self.send_token_refresh_notification(f"Cookie更新失败: {str(e)}", "cookie_update_failed") async def save_item_info_to_db(self, item_id: str, item_detail: str = None): """保存商品信息到数据库 Args: item_id: 商品ID item_detail: 商品详情内容(可以是任意格式的文本) """ try: # 跳过以 auto_ 开头的商品ID if item_id and item_id.startswith('auto_'): logger.debug(f"跳过保存自动生成的商品ID: {item_id}") return from db_manager import db_manager # 直接使用传入的详情内容 item_data = item_detail # 保存到数据库 success = db_manager.save_item_info(self.cookie_id, item_id, item_data) if success: logger.info(f"商品信息已保存到数据库: {item_id}") else: logger.warning(f"保存商品信息到数据库失败: {item_id}") except Exception as e: logger.error(f"保存商品信息到数据库异常: {self._safe_str(e)}") async def save_item_detail_only(self, item_id, item_detail): """仅保存商品详情(不影响标题等基本信息)""" try: from db_manager import db_manager # 使用专门的详情更新方法 success = db_manager.update_item_detail(self.cookie_id, item_id, item_detail) if success: logger.info(f"商品详情已更新: {item_id}") else: logger.warning(f"更新商品详情失败: {item_id}") return success except Exception as e: logger.error(f"更新商品详情异常: {self._safe_str(e)}") return False async def fetch_item_detail_from_api(self, item_id: str) -> str: """从外部API获取商品详情 Args: item_id: 商品ID Returns: str: 商品详情文本,获取失败返回空字符串 """ try: # 检查是否启用自动获取功能 from config import config auto_fetch_config = config.get('ITEM_DETAIL', {}).get('auto_fetch', {}) if not auto_fetch_config.get('enabled', True): logger.debug(f"自动获取商品详情功能已禁用: {item_id}") return "" # 从配置获取API地址和超时时间 api_base_url = auto_fetch_config.get('api_url', 'https://selfapi.zhinianboke.com/api/getItemDetail') timeout_seconds = auto_fetch_config.get('timeout', 10) api_url = f"{api_base_url}/{item_id}" logger.info(f"正在从外部API获取商品详情: {item_id}") # 使用aiohttp发送异步请求 import aiohttp import asyncio timeout = aiohttp.ClientTimeout(total=timeout_seconds) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(api_url) as response: if response.status == 200: result = await response.json() # 检查返回状态 if result.get('status') == '200' and result.get('data'): item_detail = result['data'] logger.info(f"成功获取商品详情: {item_id}, 长度: {len(item_detail)}") logger.debug(f"商品详情内容: {item_detail[:200]}...") return item_detail else: logger.warning(f"API返回状态异常: {result.get('status')}, message: {result.get('message')}") return "" else: logger.warning(f"API请求失败: HTTP {response.status}") return "" except asyncio.TimeoutError: logger.warning(f"获取商品详情超时: {item_id}") return "" except Exception as e: logger.error(f"获取商品详情异常: {item_id}, 错误: {self._safe_str(e)}") return "" async def save_items_list_to_db(self, items_list): """批量保存商品列表信息到数据库(并发安全) Args: items_list: 从get_item_list_info获取的商品列表 """ try: from db_manager import db_manager # 准备批量数据 batch_data = [] items_need_detail = [] # 需要获取详情的商品列表 for item in items_list: item_id = item.get('id') if not item_id or item_id.startswith('auto_'): continue # 构造商品详情数据 item_detail = { 'title': item.get('title', ''), 'price': item.get('price', ''), 'price_text': item.get('price_text', ''), 'category_id': item.get('category_id', ''), 'auction_type': item.get('auction_type', ''), 'item_status': item.get('item_status', 0), 'detail_url': item.get('detail_url', ''), 'pic_info': item.get('pic_info', {}), 'detail_params': item.get('detail_params', {}), 'track_params': item.get('track_params', {}), 'item_label_data': item.get('item_label_data', {}), 'card_type': item.get('card_type', 0) } # 检查数据库中是否已有详情 existing_item = db_manager.get_item_info(self.cookie_id, item_id) has_detail = existing_item and existing_item.get('item_detail') and existing_item['item_detail'].strip() batch_data.append({ 'cookie_id': self.cookie_id, 'item_id': item_id, 'item_title': item.get('title', ''), 'item_description': '', # 暂时为空 'item_category': str(item.get('category_id', '')), 'item_price': item.get('price_text', ''), 'item_detail': json.dumps(item_detail, ensure_ascii=False) }) # 如果没有详情,添加到需要获取详情的列表 if not has_detail: items_need_detail.append({ 'item_id': item_id, 'item_title': item.get('title', '') }) if not batch_data: logger.info("没有有效的商品数据需要保存") return 0 # 使用批量保存方法(并发安全) saved_count = db_manager.batch_save_item_basic_info(batch_data) logger.info(f"批量保存商品信息完成: {saved_count}/{len(batch_data)} 个商品") # 异步获取缺失的商品详情 if items_need_detail: from config import config auto_fetch_config = config.get('ITEM_DETAIL', {}).get('auto_fetch', {}) if auto_fetch_config.get('enabled', True): logger.info(f"发现 {len(items_need_detail)} 个商品缺少详情,开始获取...") detail_success_count = await self._fetch_missing_item_details(items_need_detail) logger.info(f"成功获取 {detail_success_count}/{len(items_need_detail)} 个商品的详情") else: logger.info(f"发现 {len(items_need_detail)} 个商品缺少详情,但自动获取功能已禁用") return saved_count except Exception as e: logger.error(f"批量保存商品信息异常: {self._safe_str(e)}") return 0 async def _fetch_missing_item_details(self, items_need_detail): """批量获取缺失的商品详情 Args: items_need_detail: 需要获取详情的商品列表 Returns: int: 成功获取详情的商品数量 """ success_count = 0 try: from db_manager import db_manager from config import config # 从配置获取并发数量和延迟时间 auto_fetch_config = config.get('ITEM_DETAIL', {}).get('auto_fetch', {}) max_concurrent = auto_fetch_config.get('max_concurrent', 3) retry_delay = auto_fetch_config.get('retry_delay', 0.5) # 限制并发数量,避免对API服务器造成压力 semaphore = asyncio.Semaphore(max_concurrent) async def fetch_single_item_detail(item_info): async with semaphore: try: item_id = item_info['item_id'] item_title = item_info['item_title'] # 获取商品详情 item_detail_text = await self.fetch_item_detail_from_api(item_id) if item_detail_text: # 保存详情到数据库 success = await self.save_item_detail_only(item_id, item_detail_text) if success: logger.info(f"✅ 成功获取并保存商品详情: {item_id} - {item_title}") return 1 else: logger.warning(f"❌ 获取详情成功但保存失败: {item_id}") else: logger.warning(f"❌ 未能获取商品详情: {item_id} - {item_title}") # 添加延迟,避免请求过于频繁 await asyncio.sleep(retry_delay) return 0 except Exception as e: logger.error(f"获取单个商品详情异常: {item_info.get('item_id', 'unknown')}, 错误: {self._safe_str(e)}") return 0 # 并发获取所有商品详情 tasks = [fetch_single_item_detail(item_info) for item_info in items_need_detail] results = await asyncio.gather(*tasks, return_exceptions=True) # 统计成功数量 for result in results: if isinstance(result, int): success_count += result elif isinstance(result, Exception): logger.error(f"获取商品详情任务异常: {result}") return success_count except Exception as e: logger.error(f"批量获取商品详情异常: {self._safe_str(e)}") return success_count async def get_item_info(self, item_id, retry_count=0): """获取商品信息,自动处理token失效的情况""" if retry_count >= 3: # 最多重试3次 logger.error("获取商品信息失败,重试次数过多") return {"error": "获取商品信息失败,重试次数过多"} # 如果是重试(retry_count > 0),强制刷新token if retry_count > 0: old_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' logger.info(f"重试第{retry_count}次,强制刷新token... 当前_m_h5_tk: {old_token}") await self.refresh_token() new_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' logger.info(f"重试刷新token完成,新的_m_h5_tk: {new_token}") else: # 确保使用最新的token(首次调用时的正常逻辑) if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: old_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' logger.info(f"Token过期或不存在,刷新token... 当前_m_h5_tk: {old_token}") await self.refresh_token() new_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' logger.info(f"Token刷新完成,新的_m_h5_tk: {new_token}") # 确保session已创建 if not self.session: await self.create_session() params = { 'jsv': '2.7.2', 'appKey': '34839810', 't': str(int(time.time()) * 1000), 'sign': '', 'v': '1.0', 'type': 'originaljson', 'accountSite': 'xianyu', 'dataType': 'json', 'timeout': '20000', 'api': 'mtop.taobao.idle.pc.detail', 'sessionOption': 'AutoLoginOnly', 'spm_cnt': 'a21ybx.im.0.0', } data_val = '{"itemId":"' + item_id + '"}' data = { 'data': data_val, } # 始终从最新的cookies中获取_m_h5_tk token(刷新后cookies会被更新) token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' logger.warning(111) logger.warning(token) if token: logger.debug(f"使用cookies中的_m_h5_tk token: {token}") else: logger.warning("cookies中没有找到_m_h5_tk token") from utils.xianyu_utils import generate_sign sign = generate_sign(params['t'], token, data_val) params['sign'] = sign try: async with self.session.post( 'https://h5api.m.goofish.com/h5/mtop.taobao.idle.pc.detail/1.0/', params=params, data=data ) as response: res_json = await response.json() logger.debug(f"商品信息获取成功: {res_json}") # 检查返回状态 if isinstance(res_json, dict): ret_value = res_json.get('ret', []) # 检查ret是否包含成功信息 if not any('SUCCESS::调用成功' in ret for ret in ret_value): logger.warning(f"商品信息API调用失败,错误信息: {ret_value}") # 处理响应中的Set-Cookie if 'Set-Cookie' in response.headers: logger.debug("检测到Set-Cookie,需要更新cookie") # TODO: 实现cookie更新逻辑 await asyncio.sleep(0.5) return await self.get_item_info(item_id, retry_count + 1) else: logger.debug(f"商品信息获取成功: {item_id}") return res_json else: logger.error(f"商品信息API返回格式异常: {res_json}") return await self.get_item_info(item_id, retry_count + 1) except Exception as e: logger.error(f"商品信息API请求异常: {self._safe_str(e)}") await asyncio.sleep(0.5) return await self.get_item_info(item_id, retry_count + 1) def extract_item_id_from_message(self, message): """从消息中提取商品ID的辅助方法""" try: # 方法1: 从message["1"]中提取(如果是字符串格式) message_1 = message.get('1') if isinstance(message_1, str): # 尝试从字符串中提取数字ID import re id_match = re.search(r'(\d{10,})', message_1) if id_match: logger.info(f"从message[1]字符串中提取商品ID: {id_match.group(1)}") return id_match.group(1) # 方法2: 从message["3"]中提取 message_3 = message.get('3', {}) if isinstance(message_3, dict): # 从extension中提取 if 'extension' in message_3: extension = message_3['extension'] if isinstance(extension, dict): item_id = extension.get('itemId') or extension.get('item_id') if item_id: logger.info(f"从extension中提取商品ID: {item_id}") return item_id # 从bizData中提取 if 'bizData' in message_3: biz_data = message_3['bizData'] if isinstance(biz_data, dict): item_id = biz_data.get('itemId') or biz_data.get('item_id') if item_id: logger.info(f"从bizData中提取商品ID: {item_id}") return item_id # 从其他可能的字段中提取 for key, value in message_3.items(): if isinstance(value, dict): item_id = value.get('itemId') or value.get('item_id') if item_id: logger.info(f"从{key}字段中提取商品ID: {item_id}") return item_id # 从消息内容中提取数字ID content = message_3.get('content', '') if isinstance(content, str) and content: import re id_match = re.search(r'(\d{10,})', content) if id_match: logger.info(f"【{self.cookie_id}】从消息内容中提取商品ID: {id_match.group(1)}") return id_match.group(1) # 方法3: 遍历整个消息结构查找可能的商品ID def find_item_id_recursive(obj, path=""): if isinstance(obj, dict): # 直接查找itemId字段 for key in ['itemId', 'item_id', 'id']: if key in obj and isinstance(obj[key], (str, int)): value = str(obj[key]) if len(value) >= 10 and value.isdigit(): logger.info(f"从{path}.{key}中提取商品ID: {value}") return value # 递归查找 for key, value in obj.items(): result = find_item_id_recursive(value, f"{path}.{key}" if path else key) if result: return result elif isinstance(obj, str): # 从字符串中提取可能的商品ID import re id_match = re.search(r'(\d{10,})', obj) if id_match: logger.info(f"从{path}字符串中提取商品ID: {id_match.group(1)}") return id_match.group(1) return None result = find_item_id_recursive(message) if result: return result logger.debug("所有方法都未能提取到商品ID") return None except Exception as e: logger.error(f"提取商品ID失败: {self._safe_str(e)}") return None def debug_message_structure(self, message, context=""): """调试消息结构的辅助方法""" try: logger.debug(f"[{context}] 消息结构调试:") logger.debug(f" 消息类型: {type(message)}") if isinstance(message, dict): for key, value in message.items(): logger.debug(f" 键 '{key}': {type(value)} - {str(value)[:100]}...") # 特别关注可能包含商品ID的字段 if key in ["1", "3"] and isinstance(value, dict): logger.debug(f" 详细结构 '{key}':") for sub_key, sub_value in value.items(): logger.debug(f" '{sub_key}': {type(sub_value)} - {str(sub_value)[:50]}...") else: logger.debug(f" 消息内容: {str(message)[:200]}...") except Exception as e: logger.error(f"调试消息结构时发生错误: {self._safe_str(e)}") async def get_default_reply(self, send_user_name: str, send_user_id: str, send_message: str) -> str: """获取默认回复内容,支持变量替换""" try: from db_manager import db_manager # 获取当前账号的默认回复设置 default_reply_settings = db_manager.get_default_reply(self.cookie_id) if not default_reply_settings or not default_reply_settings.get('enabled', False): logger.debug(f"账号 {self.cookie_id} 未启用默认回复") return None reply_content = default_reply_settings.get('reply_content', '') if not reply_content: logger.warning(f"账号 {self.cookie_id} 默认回复内容为空") return None # 进行变量替换 try: formatted_reply = reply_content.format( send_user_name=send_user_name, send_user_id=send_user_id, send_message=send_message ) logger.info(f"【{self.cookie_id}】使用默认回复: {formatted_reply}") return formatted_reply except Exception as format_error: logger.error(f"默认回复变量替换失败: {self._safe_str(format_error)}") # 如果变量替换失败,返回原始内容 return reply_content except Exception as e: logger.error(f"获取默认回复失败: {self._safe_str(e)}") return None async def get_keyword_reply(self, send_user_name: str, send_user_id: str, send_message: str) -> str: """获取关键词匹配回复""" try: from db_manager import db_manager # 获取当前账号的关键词列表 keywords = db_manager.get_keywords(self.cookie_id) if not keywords: logger.debug(f"账号 {self.cookie_id} 没有配置关键词") return None # 遍历关键词,查找匹配 for keyword, reply in keywords: if keyword.lower() in send_message.lower(): # 进行变量替换 try: formatted_reply = reply.format( send_user_name=send_user_name, send_user_id=send_user_id, send_message=send_message ) logger.info(f"关键词匹配成功: '{keyword}' -> {formatted_reply}") return formatted_reply except Exception as format_error: logger.error(f"关键词回复变量替换失败: {self._safe_str(format_error)}") # 如果变量替换失败,返回原始内容 return reply logger.debug(f"未找到匹配的关键词: {send_message}") return None except Exception as e: logger.error(f"获取关键词回复失败: {self._safe_str(e)}") return None async def get_ai_reply(self, send_user_name: str, send_user_id: str, send_message: str, item_id: str, chat_id: str): """获取AI回复""" try: from ai_reply_engine import ai_reply_engine # 检查是否启用AI回复 if not ai_reply_engine.is_ai_enabled(self.cookie_id): logger.debug(f"账号 {self.cookie_id} 未启用AI回复") return None # 从数据库获取商品信息 from db_manager import db_manager item_info_raw = db_manager.get_item_info(self.cookie_id, item_id) if not item_info_raw: logger.debug(f"数据库中无商品信息: {item_id}") # 使用默认商品信息 item_info = { 'title': '商品信息获取失败', 'price': 0, 'desc': '暂无商品描述' } else: # 解析数据库中的商品信息 item_info = { 'title': item_info_raw.get('item_title', '未知商品'), 'price': self._parse_price(item_info_raw.get('item_price', '0')), 'desc': item_info_raw.get('item_description', '暂无商品描述') } # 生成AI回复 reply = ai_reply_engine.generate_reply( message=send_message, item_info=item_info, chat_id=chat_id, cookie_id=self.cookie_id, user_id=send_user_id, item_id=item_id ) if reply: logger.info(f"【{self.cookie_id}】AI回复生成成功: {reply}") return reply else: logger.debug(f"AI回复生成失败") return None except Exception as e: logger.error(f"获取AI回复失败: {self._safe_str(e)}") return None def _parse_price(self, price_str: str) -> float: """解析价格字符串为数字""" try: if not price_str: return 0.0 # 移除非数字字符,保留小数点 import re price_clean = re.sub(r'[^\d.]', '', str(price_str)) return float(price_clean) if price_clean else 0.0 except: return 0.0 async def send_notification(self, send_user_name: str, send_user_id: str, send_message: str, item_id: str = None): """发送消息通知""" try: from db_manager import db_manager import aiohttp import json # 获取当前账号的通知配置 notifications = db_manager.get_account_notifications(self.cookie_id) if not notifications: logger.debug(f"账号 {self.cookie_id} 未配置消息通知") return # 构建通知消息 notification_msg = f"🚨 接收消息通知\n\n" \ f"账号: {self.cookie_id}\n" \ f"买家: {send_user_name} (ID: {send_user_id})\n" \ f"商品ID: {item_id or '未知'}\n" \ f"消息内容: {send_message}\n" \ f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n" # 发送通知到各个渠道 for notification in notifications: if not notification.get('enabled', True): continue channel_type = notification.get('channel_type') channel_config = notification.get('channel_config') try: if channel_type == 'qq': await self._send_qq_notification(channel_config, notification_msg) else: logger.warning(f"不支持的通知渠道类型: {channel_type}") except Exception as notify_error: logger.error(f"发送通知失败 ({notification.get('channel_name', 'Unknown')}): {self._safe_str(notify_error)}") except Exception as e: logger.error(f"处理消息通知失败: {self._safe_str(e)}") async def _send_qq_notification(self, config: str, message: str): """发送QQ通知""" try: import aiohttp import json # 解析配置(QQ号码) qq_number = config.strip() if not qq_number: logger.warning("QQ通知配置为空") return # 构建请求URL api_url = "http://notice.zhinianblog.cn/sendPrivateMsg" params = { 'qq': qq_number, 'msg': message } # 发送GET请求 async with aiohttp.ClientSession() as session: async with session.get(api_url, params=params, timeout=10) as response: if response.status == 200: logger.info(f"QQ通知发送成功: {qq_number}") else: logger.warning(f"QQ通知发送失败: {response.status}") except Exception as e: logger.error(f"发送QQ通知异常: {self._safe_str(e)}") async def send_token_refresh_notification(self, error_message: str, notification_type: str = "token_refresh"): """发送Token刷新异常通知(带防重复机制)""" try: # 检查是否是正常的令牌过期,这种情况不需要发送通知 if self._is_normal_token_expiry(error_message): logger.debug(f"检测到正常的令牌过期,跳过通知: {error_message}") return # 检查是否在冷却期内 current_time = time.time() last_time = self.last_notification_time.get(notification_type, 0) if current_time - last_time < self.notification_cooldown: logger.debug(f"通知在冷却期内,跳过发送: {notification_type} (距离上次 {int(current_time - last_time)} 秒)") return from db_manager import db_manager # 获取当前账号的通知配置 notifications = db_manager.get_account_notifications(self.cookie_id) if not notifications: logger.debug("未配置消息通知,跳过Token刷新通知") return # 构造通知消息 notification_msg = f"""🔴 闲鱼账号Token刷新异常 账号ID: {self.cookie_id} 异常时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 异常信息: {error_message} 请检查账号Cookie是否过期,如有需要请及时更新Cookie配置。""" logger.info(f"准备发送Token刷新异常通知: {self.cookie_id}") # 发送通知到各个渠道 notification_sent = False for notification in notifications: if not notification.get('enabled', True): continue channel_type = notification.get('channel_type') channel_config = notification.get('channel_config') try: if channel_type == 'qq': await self._send_qq_notification(channel_config, notification_msg) notification_sent = True else: logger.warning(f"不支持的通知渠道类型: {channel_type}") except Exception as notify_error: logger.error(f"发送Token刷新通知失败 ({notification.get('channel_name', 'Unknown')}): {self._safe_str(notify_error)}") # 如果成功发送了通知,更新最后发送时间 if notification_sent: self.last_notification_time[notification_type] = current_time logger.info(f"Token刷新通知已发送,下次可发送时间: {time.strftime('%H:%M:%S', time.localtime(current_time + self.notification_cooldown))}") except Exception as e: logger.error(f"处理Token刷新通知失败: {self._safe_str(e)}") def _is_normal_token_expiry(self, error_message: str) -> bool: """检查是否是正常的令牌过期或其他不需要通知的情况""" # 不需要发送通知的关键词 no_notification_keywords = [ # 正常的令牌过期 'FAIL_SYS_TOKEN_EXOIRED::令牌过期', 'FAIL_SYS_TOKEN_EXPIRED::令牌过期', 'FAIL_SYS_TOKEN_EXOIRED', 'FAIL_SYS_TOKEN_EXPIRED', '令牌过期', # Session过期(正常情况) 'FAIL_SYS_SESSION_EXPIRED::Session过期', 'FAIL_SYS_SESSION_EXPIRED', 'Session过期', # Token定时刷新失败(会自动重试) 'Token定时刷新失败,将自动重试', 'Token定时刷新失败' ] # 检查错误消息是否包含不需要通知的关键词 for keyword in no_notification_keywords: if keyword in error_message: return True return False async def send_delivery_failure_notification(self, send_user_name: str, send_user_id: str, item_id: str, error_message: str): """发送自动发货失败通知""" try: from db_manager import db_manager # 获取当前账号的通知配置 notifications = db_manager.get_account_notifications(self.cookie_id) if not notifications: logger.debug("未配置消息通知,跳过自动发货通知") return # 构造通知消息 notification_message = f"🚨 自动发货通知\n\n" \ f"账号: {self.cookie_id}\n" \ f"买家: {send_user_name} (ID: {send_user_id})\n" \ f"商品ID: {item_id}\n" \ f"结果: {error_message}\n" \ f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n" \ f"请及时处理!" # 发送通知到所有已启用的通知渠道 for notification in notifications: if notification.get('enabled', False): channel_type = notification.get('channel_type', 'qq') channel_config = notification.get('channel_config', '') if channel_type == 'qq': await self._send_qq_notification(channel_config, notification_message) logger.info(f"已发送自动发货通知到QQ: {channel_config}") except Exception as e: logger.error(f"发送自动发货通知异常: {self._safe_str(e)}") async def _auto_delivery(self, item_id: str, item_title: str = None): """自动发货功能""" try: from db_manager import db_manager logger.info(f"开始自动发货检查: 商品ID={item_id}") # 获取商品详细信息 item_info = None search_text = item_title # 默认使用传入的标题 if item_id and item_id != "未知商品": # 优先尝试通过API获取商品信息 try: logger.info(f"通过API获取商品详细信息: {item_id}") item_info = await self.get_item_info(item_id) if item_info and 'data' in item_info: data = item_info['data'] item_data = data['itemDO'] shareData = item_data['shareData'] shareInfoJsonString = shareData['shareInfoJsonString'] # 解析 shareInfoJsonString 并提取 content 内容 try: import json share_info = json.loads(shareInfoJsonString) content = share_info.get('contentParams', {}).get('mainParams', {}).get('content', '') if content: search_text = content logger.info(f"API成功提取商品内容作为搜索文本: {content[:100]}...") else: search_text = shareInfoJsonString logger.warning("未能从API商品信息中提取到content字段,使用完整JSON字符串") except json.JSONDecodeError as json_e: logger.warning(f"解析API商品信息JSON失败: {self._safe_str(json_e)},使用原始字符串") search_text = shareInfoJsonString except Exception as parse_e: logger.warning(f"提取API商品内容失败: {self._safe_str(parse_e)},使用原始字符串") search_text = shareInfoJsonString logger.info(f"API获取到的商品信息为: {search_text[:200]}...") else: raise Exception("API返回数据格式异常") except Exception as e: logger.warning(f"API获取商品信息失败: {self._safe_str(e)},尝试从数据库获取") # API失败时,从数据库获取商品信息 try: db_item_info = db_manager.get_item_info(self.cookie_id, item_id) if db_item_info: # 拼接商品标题和详情作为搜索文本 item_title_db = db_item_info.get('item_title', '') or '' item_detail_db = db_item_info.get('item_detail', '') or '' # 如果数据库中没有详情,尝试从外部API获取 if not item_detail_db.strip(): from config import config auto_fetch_config = config.get('ITEM_DETAIL', {}).get('auto_fetch', {}) if auto_fetch_config.get('enabled', True): logger.info(f"数据库中商品详情为空,尝试从外部API获取: {item_id}") try: fetched_detail = await self.fetch_item_detail_from_api(item_id) if fetched_detail: # 保存获取到的详情 await self.save_item_detail_only(item_id, fetched_detail) item_detail_db = fetched_detail logger.info(f"成功从外部API获取并保存商品详情: {item_id}") else: logger.warning(f"外部API未能获取到商品详情: {item_id}") except Exception as api_e: logger.warning(f"从外部API获取商品详情失败: {item_id}, 错误: {self._safe_str(api_e)}") else: logger.debug(f"自动获取商品详情功能已禁用,跳过: {item_id}") # 组合搜索文本:商品标题 + 商品详情 search_parts = [] if item_title_db.strip(): search_parts.append(item_title_db.strip()) if item_detail_db.strip(): search_parts.append(item_detail_db.strip()) if search_parts: search_text = ' '.join(search_parts) logger.info(f"使用数据库商品标题+详情作为搜索文本: 标题='{item_title_db}', 详情长度={len(item_detail_db)}") logger.debug(f"完整搜索文本: {search_text[:200]}...") else: logger.warning(f"数据库中商品标题和详情都为空,且无法从API获取: {item_id}") search_text = item_title or item_id else: logger.debug(f"数据库中未找到商品信息: {item_id}") search_text = item_title or item_id except Exception as db_e: logger.debug(f"从数据库获取商品信息失败: {self._safe_str(db_e)}") search_text = item_title or item_id if not search_text: search_text = item_id or "未知商品" logger.info(f"使用搜索文本匹配发货规则: {search_text[:100]}...") # 根据商品信息查找匹配的发货规则 delivery_rules = db_manager.get_delivery_rules_by_keyword(search_text) if not delivery_rules: logger.info(f"未找到匹配的发货规则: {search_text[:50]}...") return None # 使用第一个匹配的规则(按关键字长度降序排列,优先匹配更精确的规则) # 保存商品信息到数据库 await self.save_item_info_to_db(item_id, search_text) rule = delivery_rules[0] logger.info(f"找到匹配的发货规则: {rule['keyword']} -> {rule['card_name']} ({rule['card_type']})") delivery_content = None # 根据卡券类型处理发货内容 if rule['card_type'] == 'api': # API类型:调用API获取内容 delivery_content = await self._get_api_card_content(rule) elif rule['card_type'] == 'text': # 固定文字类型:直接使用文字内容 delivery_content = rule['card_text_content'] elif rule['card_type'] == 'data': # 批量数据类型:获取并消费第一条数据 delivery_content = db_manager.consume_batch_data(rule['card_id']) if delivery_content: # 处理备注信息和变量替换 final_content = self._process_delivery_content_with_description(delivery_content, rule.get('card_description', '')) # 增加发货次数统计 db_manager.increment_delivery_times(rule['id']) logger.info(f"自动发货成功: 规则ID={rule['id']}, 内容长度={len(final_content)}") return final_content else: logger.warning(f"获取发货内容失败: 规则ID={rule['id']}") return None except Exception as e: logger.error(f"自动发货失败: {self._safe_str(e)}") return None def _process_delivery_content_with_description(self, delivery_content: str, card_description: str) -> str: """处理发货内容和备注信息,实现变量替换""" try: # 如果没有备注信息,直接返回发货内容 if not card_description or not card_description.strip(): return delivery_content # 替换备注中的变量 processed_description = card_description.replace('{DELIVERY_CONTENT}', delivery_content) # 如果备注中包含变量替换,返回处理后的备注 if '{DELIVERY_CONTENT}' in card_description: return processed_description else: # 如果备注中没有变量,将备注和发货内容组合 return f"{processed_description}\n\n{delivery_content}" except Exception as e: logger.error(f"处理备注信息失败: {e}") # 出错时返回原始发货内容 return delivery_content async def _get_api_card_content(self, rule, retry_count=0): """调用API获取卡券内容,支持重试机制""" max_retries = 3 if retry_count >= max_retries: logger.error(f"API调用失败,已达到最大重试次数({max_retries})") return None try: import aiohttp import json import asyncio api_config = rule.get('card_api_config') if not api_config: logger.error("API配置为空") return None # 解析API配置 if isinstance(api_config, str): api_config = json.loads(api_config) url = api_config.get('url') method = api_config.get('method', 'GET').upper() timeout = api_config.get('timeout', 10) headers = api_config.get('headers', '{}') params = api_config.get('params', '{}') # 解析headers和params if isinstance(headers, str): headers = json.loads(headers) if isinstance(params, str): params = json.loads(params) retry_info = f" (重试 {retry_count + 1}/{max_retries})" if retry_count > 0 else "" logger.info(f"调用API获取卡券: {method} {url}{retry_info}") # 确保session存在 if not self.session: await self.create_session() # 发起HTTP请求 timeout_obj = aiohttp.ClientTimeout(total=timeout) if method == 'GET': async with self.session.get(url, headers=headers, params=params, timeout=timeout_obj) as response: status_code = response.status response_text = await response.text() elif method == 'POST': async with self.session.post(url, headers=headers, json=params, timeout=timeout_obj) as response: status_code = response.status response_text = await response.text() else: logger.error(f"不支持的HTTP方法: {method}") return None if status_code == 200: # 尝试解析JSON响应,如果失败则使用原始文本 try: result = json.loads(response_text) # 如果返回的是对象,尝试提取常见的内容字段 if isinstance(result, dict): content = result.get('data') or result.get('content') or result.get('card') or str(result) else: content = str(result) except: content = response_text logger.info(f"API调用成功,返回内容长度: {len(content)}") return content else: logger.warning(f"API调用失败: {status_code} - {response_text[:200]}...") # 如果是服务器错误(5xx)或请求超时,进行重试 if status_code >= 500 or status_code == 408: if retry_count < max_retries - 1: wait_time = (retry_count + 1) * 2 # 递增等待时间: 2s, 4s, 6s logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) return await self._get_api_card_content(rule, retry_count + 1) return None except (aiohttp.ClientTimeout, aiohttp.ClientError) as e: logger.warning(f"API调用网络异常: {self._safe_str(e)}") # 网络异常也进行重试 if retry_count < max_retries - 1: wait_time = (retry_count + 1) * 2 # 递增等待时间 logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) return await self._get_api_card_content(rule, retry_count + 1) else: logger.error(f"API调用网络异常,已达到最大重试次数: {self._safe_str(e)}") return None except Exception as e: logger.error(f"API调用异常: {self._safe_str(e)}") return None async def token_refresh_loop(self): """Token刷新循环""" while True: try: # 检查账号是否启用 from cookie_manager import manager as cookie_manager if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id): logger.info(f"【{self.cookie_id}】账号已禁用,停止Token刷新循环") break current_time = time.time() if current_time - self.last_token_refresh_time >= self.token_refresh_interval: logger.info("Token即将过期,准备刷新...") new_token = await self.refresh_token() if new_token: logger.info(f"【{self.cookie_id}】Token刷新成功,准备重新建立连接...") self.connection_restart_flag = True if self.ws: await self.ws.close() break else: logger.error(f"【{self.cookie_id}】Token刷新失败,将在{self.token_retry_interval // 60}分钟后重试") # 发送Token刷新失败通知 await self.send_token_refresh_notification("Token定时刷新失败,将自动重试", "token_scheduled_refresh_failed") await asyncio.sleep(self.token_retry_interval) continue await asyncio.sleep(60) except Exception as e: logger.error(f"Token刷新循环出错: {self._safe_str(e)}") await asyncio.sleep(60) async def create_chat(self, ws, toid, item_id='891198795482'): msg = { "lwp": "/r/SingleChatConversation/create", "headers": { "mid": generate_mid() }, "body": [ { "pairFirst": f"{toid}@goofish", "pairSecond": f"{self.myid}@goofish", "bizType": "1", "extension": { "itemId": item_id }, "ctx": { "appVersion": "1.0", "platform": "web" } } ] } await ws.send(json.dumps(msg)) async def send_msg(self, ws, cid, toid, text): text = { "contentType": 1, "text": { "text": text } } text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8') msg = { "lwp": "/r/MessageSend/sendByReceiverScope", "headers": { "mid": generate_mid() }, "body": [ { "uuid": generate_uuid(), "cid": f"{cid}@goofish", "conversationType": 1, "content": { "contentType": 101, "custom": { "type": 1, "data": text_base64 } }, "redPointPolicy": 0, "extension": { "extJson": "{}" }, "ctx": { "appVersion": "1.0", "platform": "web" }, "mtags": {}, "msgReadStatusSetting": 1 }, { "actualReceivers": [ f"{toid}@goofish", f"{self.myid}@goofish" ] } ] } await ws.send(json.dumps(msg)) async def init(self, ws): # 如果没有token或者token过期,获取新token token_refresh_attempted = False if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: logger.info(f"【{self.cookie_id}】获取初始token...") token_refresh_attempted = True await self.refresh_token() if not self.current_token: logger.error("无法获取有效token,初始化失败") # 只有在没有尝试刷新token的情况下才发送通知,避免与refresh_token中的通知重复 if not token_refresh_attempted: await self.send_token_refresh_notification("初始化时无法获取有效Token", "token_init_failed") else: logger.info("由于刚刚尝试过token刷新,跳过重复的初始化失败通知") raise Exception("Token获取失败") msg = { "lwp": "/reg", "headers": { "cache-header": "app-key token ua wv", "app-key": APP_CONFIG.get('app_key'), "token": self.current_token, "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5", "dt": "j", "wv": "im:3,au:3,sy:6", "sync": "0,0;0;0;", "did": self.device_id, "mid": generate_mid() } } await ws.send(json.dumps(msg)) await asyncio.sleep(1) current_time = int(time.time() * 1000) msg = { "lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": generate_mid()}, "body": [ { "pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0, "pts": current_time * 1000, "seq": 0, "timestamp": current_time } ] } await ws.send(json.dumps(msg)) logger.info(f'【{self.cookie_id}】连接注册完成') async def send_heartbeat(self, ws): """发送心跳包""" msg = { "lwp": "/!", "headers": { "mid": generate_mid() } } await ws.send(json.dumps(msg)) self.last_heartbeat_time = time.time() async def heartbeat_loop(self, ws): """心跳循环""" while True: try: # 检查账号是否启用 from cookie_manager import manager as cookie_manager if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id): logger.info(f"【{self.cookie_id}】账号已禁用,停止心跳循环") break await self.send_heartbeat(ws) await asyncio.sleep(self.heartbeat_interval) except Exception as e: logger.error(f"心跳发送失败: {self._safe_str(e)}") break async def handle_heartbeat_response(self, message_data): """处理心跳响应""" try: if message_data.get("code") == 200: self.last_heartbeat_response = time.time() logger.debug("心跳响应正常") return True except Exception as e: logger.error(f"处理心跳响应出错: {self._safe_str(e)}") return False async def send_msg_once(self, toid, item_id, text): headers = { "Cookie": self.cookies_str, "Host": "wss-goofish.dingtalk.com", "Connection": "Upgrade", "Pragma": "no-cache", "Cache-Control": "no-cache", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", "Origin": "https://www.goofish.com", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", } # 兼容不同版本的websockets库 try: async with websockets.connect( self.base_url, extra_headers=headers ) as websocket: await self._handle_websocket_connection(websocket, toid, item_id, text) except TypeError as e: # 安全地检查异常信息 error_msg = self._safe_str(e) if "extra_headers" in error_msg: logger.warning("websockets库不支持extra_headers参数,使用兼容模式") # 使用兼容模式,通过subprotocols传递部分头信息 async with websockets.connect( self.base_url, additional_headers=headers ) as websocket: await self._handle_websocket_connection(websocket, toid, item_id, text) else: raise async def _create_websocket_connection(self, headers): """创建WebSocket连接,兼容不同版本的websockets库""" try: # 尝试使用extra_headers参数 return websockets.connect( self.base_url, extra_headers=headers ) except TypeError as e: # 安全地检查异常信息 error_msg = self._safe_str(e) if "extra_headers" in error_msg: logger.warning("websockets库不支持extra_headers参数,使用兼容模式") # 使用additional_headers参数(较新版本) try: return websockets.connect( self.base_url, additional_headers=headers ) except TypeError: # 如果都不支持,则不传递headers logger.warning("websockets库不支持headers参数,使用基础连接模式") return websockets.connect(self.base_url) else: raise async def _handle_websocket_connection(self, websocket, toid, item_id, text): """处理WebSocket连接的具体逻辑""" await self.init(websocket) await self.create_chat(websocket, toid, item_id) async for message in websocket: try: logger.info(f"【{self.cookie_id}】message: {message}") message = json.loads(message) cid = message["body"]["singleChatConversation"]["cid"] cid = cid.split('@')[0] await self.send_msg(websocket, cid, toid, text) logger.info(f'【{self.cookie_id}】send message') return except Exception as e: pass def is_chat_message(self, message): """判断是否为用户聊天消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], dict) and "10" in message["1"] and isinstance(message["1"]["10"], dict) and "reminderContent" in message["1"]["10"] ) except Exception: return False def is_sync_package(self, message_data): """判断是否为同步包消息""" try: return ( isinstance(message_data, dict) and "body" in message_data and "syncPushPackage" in message_data["body"] and "data" in message_data["body"]["syncPushPackage"] and len(message_data["body"]["syncPushPackage"]["data"]) > 0 ) except Exception: return False async def create_session(self): """创建aiohttp session""" if not self.session: # 创建带有cookies和headers的session headers = DEFAULT_HEADERS.copy() headers['cookie'] = self.cookies_str self.session = aiohttp.ClientSession( headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) async def close_session(self): """关闭aiohttp session""" if self.session: await self.session.close() self.session = None async def get_api_reply(self, msg_time, user_url, send_user_id, send_user_name, item_id, send_message, chat_id): """调用API获取回复消息""" try: if not self.session: await self.create_session() api_config = AUTO_REPLY.get('api', {}) timeout = aiohttp.ClientTimeout(total=api_config.get('timeout', 10)) payload = { "cookie_id": self.cookie_id, "msg_time": msg_time, "user_url": user_url, "send_user_id": send_user_id, "send_user_name": send_user_name, "item_id": item_id, "send_message": send_message, "chat_id": chat_id } async with self.session.post( api_config.get('url', 'http://localhost:8080/xianyu/reply'), json=payload, timeout=timeout ) as response: result = await response.json() # 将code转换为字符串进行比较,或者直接用数字比较 if str(result.get('code')) == '200' or result.get('code') == 200: send_msg = result.get('data', {}).get('send_msg') if send_msg: # 格式化消息中的占位符 return send_msg.format( send_user_id=payload['send_user_id'], send_user_name=payload['send_user_name'], send_message=payload['send_message'] ) else: logger.warning("API返回成功但无回复消息") return None else: logger.warning(f"API返回错误: {result.get('msg', '未知错误')}") return None except asyncio.TimeoutError: logger.error("API调用超时") return None except Exception as e: logger.error(f"调用API出错: {self._safe_str(e)}") return None async def handle_message(self, message_data, websocket): """处理所有类型的消息""" try: # 检查账号是否启用 from cookie_manager import manager as cookie_manager if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id): logger.debug(f"【{self.cookie_id}】账号已禁用,跳过消息处理") return # 发送确认消息 try: message = message_data ack = { "code": 200, "headers": { "mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(), "sid": message["headers"]["sid"] if "sid" in message["headers"] else '', } } if 'app-key' in message["headers"]: ack["headers"]["app-key"] = message["headers"]["app-key"] if 'ua' in message["headers"]: ack["headers"]["ua"] = message["headers"]["ua"] if 'dt' in message["headers"]: ack["headers"]["dt"] = message["headers"]["dt"] await websocket.send(json.dumps(ack)) except Exception as e: pass # 如果不是同步包消息,直接返回 if not self.is_sync_package(message_data): return # 获取并解密数据 sync_data = message_data["body"]["syncPushPackage"]["data"][0] # 检查是否有必要的字段 if "data" not in sync_data: logger.debug("同步包中无data字段") return # 解密数据 message = None try: data = sync_data["data"] try: data = base64.b64decode(data).decode("utf-8") parsed_data = json.loads(data) # 处理未加密的消息(如系统提示等) msg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) if isinstance(parsed_data, dict) and 'chatType' in parsed_data: if 'operation' in parsed_data and 'content' in parsed_data['operation']: content = parsed_data['operation']['content'] if 'sessionArouse' in content: # 处理系统引导消息 logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】小闲鱼智能提示:") if 'arouseChatScriptInfo' in content['sessionArouse']: for qa in content['sessionArouse']['arouseChatScriptInfo']: logger.info(f" - {qa['chatScrip']}") elif 'contentType' in content: # 其他类型的未加密消息 logger.debug(f"[{msg_time}] 【{self.cookie_id}】【系统】其他类型消息: {content}") return else: # 如果不是系统消息,将解析的数据作为message message = parsed_data except Exception as e: # 如果JSON解析失败,尝试解密 decrypted_data = decrypt(data) message = json.loads(decrypted_data) except Exception as e: logger.error(f"消息解密失败: {self._safe_str(e)}") return # 确保message不为空 if message is None: logger.error("消息解析后为空") return # 确保message是字典类型 if not isinstance(message, dict): logger.error(f"消息格式错误,期望字典但得到: {type(message)}") logger.debug(f"消息内容: {message}") return # 安全地获取用户ID user_id = None try: message_1 = message.get("1") if isinstance(message_1, str) and '@' in message_1: user_id = message_1.split('@')[0] elif isinstance(message_1, dict): # 如果message['1']是字典,尝试其他方式提取user_id user_id = "unknown_user" else: user_id = "unknown_user" except Exception as e: logger.debug(f"提取用户ID失败: {self._safe_str(e)}") user_id = "unknown_user" # 安全地获取商品ID # item_id = None # try: # # 尝试从reminderUrl中提取商品ID # message_1 = message.get("1") # if isinstance(message_1, dict): # reminder_data = message_1.get("10") # if isinstance(reminder_data, dict): # url_info = reminder_data.get("reminderUrl") # if isinstance(url_info, str) and "itemId=" in url_info: # item_id = url_info.split("itemId=")[1].split("&")[0] # logger.info(f"从reminderUrl提取商品ID: {item_id}") # # 如果没有提取到,使用辅助方法 # if not item_id: # item_id = self.extract_item_id_from_message(message) # # 最后的fallback # if not item_id: # item_id = f"auto_{user_id}_{int(time.time())}" # logger.warning(f"无法提取商品ID,使用默认值: {item_id}") # except Exception as e: # logger.error(f"提取商品ID时发生错误: {self._safe_str(e)}") # item_id = f"auto_{user_id}_{int(time.time())}" # 安全地提取商品ID item_id = None try: if "1" in message and isinstance(message["1"], dict) and "10" in message["1"] and isinstance(message["1"]["10"], dict): url_info = message["1"]["10"].get("reminderUrl", "") if isinstance(url_info, str) and "itemId=" in url_info: item_id = url_info.split("itemId=")[1].split("&")[0] # 如果没有提取到,使用辅助方法 if not item_id: item_id = self.extract_item_id_from_message(message) if not item_id: item_id = f"auto_{user_id}_{int(time.time())}" logger.debug(f"无法提取商品ID,使用默认值: {item_id}") except Exception as e: logger.error(f"提取商品ID时发生错误: {self._safe_str(e)}") item_id = f"auto_{user_id}_{int(time.time())}" # 处理订单状态消息 try: logger.info(message) msg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 安全地检查订单状态 red_reminder = None if isinstance(message, dict) and "3" in message and isinstance(message["3"], dict): red_reminder = message["3"].get("redReminder") if red_reminder == '等待买家付款': user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'[{msg_time}] 【系统】等待买家 {user_url} 付款') return elif red_reminder == '交易关闭': user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'[{msg_time}] 【系统】买家 {user_url} 交易关闭') return elif red_reminder == '等待卖家发货': user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'[{msg_time}] 【系统】交易成功 {user_url} 等待卖家发货') # return except: pass # 判断是否为聊天消息 if not self.is_chat_message(message): logger.debug("非聊天消息") return # 处理聊天消息 try: # 安全地提取聊天消息信息 if not (isinstance(message, dict) and "1" in message and isinstance(message["1"], dict)): logger.error("消息格式错误:缺少必要的字段结构") return message_1 = message["1"] if not isinstance(message_1.get("10"), dict): logger.error("消息格式错误:缺少消息详情字段") return create_time = int(message_1.get("5", 0)) message_10 = message_1["10"] send_user_name = message_10.get("senderNick", message_10.get("reminderTitle", "未知用户")) send_user_id = message_10.get("senderUserId", "unknown") send_message = message_10.get("reminderContent", "") chat_id_raw = message_1.get("2", "") chat_id = chat_id_raw.split('@')[0] if '@' in str(chat_id_raw) else str(chat_id_raw) except Exception as e: logger.error(f"提取聊天消息信息失败: {self._safe_str(e)}") return # 格式化消息时间 msg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(create_time/1000)) # 人工接管模式已禁用,永远走自动模式 # if self.check_toggle_keywords(send_message): # is_manual = self.toggle_manual_mode(chat_id) # mode_str = "进入" if is_manual else "退出" # logger.info(f"[{msg_time}] 【系统】用户: {send_user_name} 的会话 {chat_id} [商品id: {item_id}] {mode_str}人工接管模式") # return # 判断消息方向 if send_user_id == self.myid: logger.info(f"[{msg_time}] 【手动发出】 商品({item_id}): {send_message}") return else: logger.info(f"[{msg_time}] 【收到】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {send_message}") # 人工接管模式已禁用,永远走自动模式 # if self.is_manual_mode(chat_id): # logger.info(f"[{msg_time}] 【系统】会话 {chat_id} 处于人工接管模式,不自动回复") # return # 自动回复消息 if not AUTO_REPLY.get('enabled', True): logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】自动回复已禁用") return # 构造用户URL user_url = f'https://www.goofish.com/personal?userId={send_user_id}' reply = None # 判断是否启用API回复 if AUTO_REPLY.get('api', {}).get('enabled', False): reply = await self.get_api_reply( msg_time, user_url, send_user_id, send_user_name, item_id, send_message, chat_id ) if not reply: logger.error(f"[{msg_time}] 【API调用失败】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {send_message}") if send_message == '[我已拍下,待付款]': logger.info(f'[{msg_time}] 【{self.cookie_id}】系统消息不处理') return elif send_message == '[你关闭了订单,钱款已原路退返]': logger.info(f'[{msg_time}] 【{self.cookie_id}】系统消息不处理') return elif send_message == '发来一条消息': logger.info(f'[{msg_time}] 【{self.cookie_id}】系统通知消息不处理') return elif send_message == '发来一条新消息': logger.info(f'[{msg_time}] 【{self.cookie_id}】系统通知消息不处理') return elif send_message == '[买家确认收货,交易成功]': logger.info(f'[{msg_time}] 【{self.cookie_id}】交易完成消息不处理') return elif send_message == '快给ta一个评价吧~' or send_message == '快给ta一个评价吧~': logger.info(f'[{msg_time}] 【{self.cookie_id}】评价提醒消息不处理') return elif send_message == '[你已发货]': logger.info(f'[{msg_time}] 【{self.cookie_id}】发货确认消息不处理') return elif send_message == '[我已付款,等待你发货]': logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】买家已付款,准备自动发货') # 检查是否可以进行自动发货(防重复) if not self.can_auto_delivery(item_id): return # 构造用户URL user_url = f'https://www.goofish.com/personal?userId={send_user_id}' # 自动发货逻辑 try: # 设置默认标题(将通过API获取真实商品信息) item_title = "待获取商品信息" logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") # 调用自动发货方法 delivery_content = await self._auto_delivery(item_id, item_title) if delivery_content: # 标记已发货(防重复) self.mark_delivery_sent(item_id) # 发送发货内容给买家 await self.send_msg(websocket, chat_id, send_user_id, delivery_content) logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") else: logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') # 发送自动发货失败通知 await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") except Exception as e: logger.error(f"自动发货处理异常: {self._safe_str(e)}") # 发送自动发货异常通知 await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") return elif send_message == '[已付款,待发货]': logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】买家已付款,准备自动发货') # 检查是否可以进行自动发货(防重复) if not self.can_auto_delivery(item_id): return # 构造用户URL user_url = f'https://www.goofish.com/personal?userId={send_user_id}' # 自动发货逻辑 try: # 设置默认标题(将通过API获取真实商品信息) item_title = "待获取商品信息" logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") # 调用自动发货方法 delivery_content = await self._auto_delivery(item_id, item_title) if delivery_content: # 标记已发货(防重复) self.mark_delivery_sent(item_id) # 发送发货内容给买家 await self.send_msg(websocket, chat_id, send_user_id, delivery_content) logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") else: logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') # 发送自动发货失败通知 await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") except Exception as e: logger.error(f"自动发货处理异常: {self._safe_str(e)}") # 发送自动发货异常通知 await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") return elif send_message == '[卡片消息]': # 检查是否为"我已小刀,待刀成"的卡片消息 try: # 从消息中提取卡片内容 card_title = None if isinstance(message, dict) and "1" in message and isinstance(message["1"], dict): message_1 = message["1"] if "6" in message_1 and isinstance(message_1["6"], dict): message_6 = message_1["6"] if "3" in message_6 and isinstance(message_6["3"], dict): message_6_3 = message_6["3"] if "5" in message_6_3: # 解析JSON内容 try: card_content = json.loads(message_6_3["5"]) if "dxCard" in card_content and "item" in card_content["dxCard"]: card_item = card_content["dxCard"]["item"] if "main" in card_item and "exContent" in card_item["main"]: ex_content = card_item["main"]["exContent"] card_title = ex_content.get("title", "") except (json.JSONDecodeError, KeyError) as e: logger.debug(f"解析卡片消息失败: {e}") # 检查是否为"我已小刀,待刀成" if card_title == "我已小刀,待刀成": logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成",准备自动发货') # 检查是否可以进行自动发货(防重复) if not self.can_auto_delivery(item_id): return # 构造用户URL user_url = f'https://www.goofish.com/personal?userId={send_user_id}' # 自动发货逻辑 try: # 设置默认标题(将通过API获取真实商品信息) item_title = "待获取商品信息" logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") # 调用自动发货方法 delivery_content = await self._auto_delivery(item_id, item_title) if delivery_content: # 标记已发货(防重复) self.mark_delivery_sent(item_id) # 发送发货内容给买家 await self.send_msg(websocket, chat_id, send_user_id, delivery_content) logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") else: logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') # 发送自动发货失败通知 await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") except Exception as e: logger.error(f"自动发货处理异常: {self._safe_str(e)}") # 发送自动发货异常通知 await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") return else: logger.info(f'[{msg_time}] 【{self.cookie_id}】收到卡片消息,标题: {card_title or "未知"}') except Exception as e: logger.error(f"处理卡片消息异常: {self._safe_str(e)}") # 如果不是目标卡片消息,继续正常处理流程 # 记录回复来源 reply_source = 'API' # 默认假设是API回复 # 如果API回复失败或未启用API,按新的优先级顺序处理 if not reply: # 1. 首先尝试关键词匹配 reply = await self.get_keyword_reply(send_user_name, send_user_id, send_message) if reply: reply_source = '关键词' # 标记为关键词回复 else: # 2. 关键词匹配失败,如果AI开关打开,尝试AI回复 reply = await self.get_ai_reply(send_user_name, send_user_id, send_message, item_id, chat_id) if reply: reply_source = 'AI' # 标记为AI回复 else: # 3. 最后使用默认回复 reply = await self.get_default_reply(send_user_name, send_user_id, send_message) reply_source = '默认' # 标记为默认回复 # 保存商品信息到数据库(记录通知时传递的item_id) if item_id: await self.save_item_info_to_db(item_id, None) # 发送通知 await self.send_notification(send_user_name, send_user_id, send_message, item_id) # 如果有回复内容,发送消息 if reply: await self.send_msg(websocket, chat_id, send_user_id, reply) # 记录发出的消息 msg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) logger.info(f"[{msg_time}] 【{reply_source}发出】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {reply}") else: msg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】未找到匹配的回复规则,不回复") except Exception as e: logger.error(f"处理消息时发生错误: {self._safe_str(e)}") logger.debug(f"原始消息: {message_data}") async def main(self): """主程序入口""" try: await self.create_session() # 创建session while True: try: # 检查账号是否启用 from cookie_manager import manager as cookie_manager if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id): logger.info(f"【{self.cookie_id}】账号已禁用,停止主循环") break headers = WEBSOCKET_HEADERS.copy() headers['Cookie'] = self.cookies_str # 兼容不同版本的websockets库 async with await self._create_websocket_connection(headers) as websocket: self.ws = websocket await self.init(websocket) # 启动心跳任务 self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket)) # 启动token刷新任务 self.token_refresh_task = asyncio.create_task(self.token_refresh_loop()) async for message in websocket: try: message_data = json.loads(message) # 处理心跳响应 if await self.handle_heartbeat_response(message_data): continue # 处理其他消息 await self.handle_message(message_data, websocket) except Exception as e: logger.error(f"处理消息出错: {self._safe_str(e)}") continue except Exception as e: logger.error(f"WebSocket连接异常: {self._safe_str(e)}") if self.heartbeat_task: self.heartbeat_task.cancel() if self.token_refresh_task: self.token_refresh_task.cancel() await asyncio.sleep(5) # 等待5秒后重试 continue finally: await self.close_session() # 确保关闭session async def get_item_list_info(self, page_number=1, page_size=20, retry_count=0): """获取商品信息,自动处理token失效的情况 Args: page_number (int): 页码,从1开始 page_size (int): 每页数量,默认20 retry_count (int): 重试次数,内部使用 """ if retry_count >= 3: # 最多重试3次 logger.error("获取商品信息失败,重试次数过多") return {"error": "获取商品信息失败,重试次数过多"} # 如果是重试(retry_count > 0),强制刷新token if retry_count > 0: old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' logger.info(f"重试第{retry_count}次,强制刷新token... 当前_m_h5_tk: {old_token}") await self.refresh_token() new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' logger.info(f"重试刷新token完成,新的_m_h5_tk: {new_token}") else: # 确保使用最新的token(首次调用时的正常逻辑) if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' logger.info(f"Token过期或不存在,刷新token... 当前_m_h5_tk: {old_token}") await self.refresh_token() new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' logger.info(f"Token刷新完成,新的_m_h5_tk: {new_token}") # 确保session已创建 if not self.session: await self.create_session() params = { 'jsv': '2.7.2', 'appKey': '34839810', 't': str(int(time.time()) * 1000), 'sign': '', 'v': '1.0', 'type': 'originaljson', 'accountSite': 'xianyu', 'dataType': 'json', 'timeout': '20000', 'api': 'mtop.idle.web.xyh.item.list', 'sessionOption': 'AutoLoginOnly', 'spm_cnt': 'a21ybx.im.0.0', 'spm_pre': 'a21ybx.collection.menu.1.272b5141NafCNK' } data = { 'needGroupInfo': False, 'pageNumber': page_number, 'pageSize': page_size, 'groupName': '在售', 'groupId': '58877261', 'defaultGroup': True, "userId": self.myid } # 始终从最新的cookies中获取_m_h5_tk token(刷新后cookies会被更新) token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' logger.warning(f"准备获取商品列表,token: {token}") if token: logger.debug(f"使用cookies中的_m_h5_tk token: {token}") else: logger.warning("cookies中没有找到_m_h5_tk token") # 生成签名 data_val = json.dumps(data, separators=(',', ':')) sign = generate_sign(params['t'], token, data_val) params['sign'] = sign try: async with self.session.post( 'https://h5api.m.goofish.com/h5/mtop.idle.web.xyh.item.list/1.0/', params=params, data={'data': data_val} ) as response: res_json = await response.json() logger.info(f"商品信息获取响应: {res_json}") # 检查响应是否成功 if res_json.get('ret') and res_json['ret'][0] == 'SUCCESS::调用成功': items_data = res_json.get('data', {}) # 从cardList中提取商品信息 card_list = items_data.get('cardList', []) # 解析cardList中的商品信息 items_list = [] for card in card_list: card_data = card.get('cardData', {}) if card_data: # 提取商品基本信息 item_info = { 'id': card_data.get('id', ''), 'title': card_data.get('title', ''), 'price': card_data.get('priceInfo', {}).get('price', ''), 'price_text': card_data.get('priceInfo', {}).get('preText', '') + card_data.get('priceInfo', {}).get('price', ''), 'category_id': card_data.get('categoryId', ''), 'auction_type': card_data.get('auctionType', ''), 'item_status': card_data.get('itemStatus', 0), 'detail_url': card_data.get('detailUrl', ''), 'pic_info': card_data.get('picInfo', {}), 'detail_params': card_data.get('detailParams', {}), 'track_params': card_data.get('trackParams', {}), 'item_label_data': card_data.get('itemLabelDataVO', {}), 'card_type': card.get('cardType', 0) } items_list.append(item_info) logger.info(f"成功获取到 {len(items_list)} 个商品") # 打印商品详细信息到控制台 print("\n" + "="*80) print(f"📦 账号 {self.myid} 的商品列表 (第{page_number}页,{len(items_list)} 个商品)") print("="*80) for i, item in enumerate(items_list, 1): print(f"\n🔸 商品 {i}:") print(f" 商品ID: {item.get('id', 'N/A')}") print(f" 商品标题: {item.get('title', 'N/A')}") print(f" 价格: {item.get('price_text', 'N/A')}") print(f" 分类ID: {item.get('category_id', 'N/A')}") print(f" 商品状态: {item.get('item_status', 'N/A')}") print(f" 拍卖类型: {item.get('auction_type', 'N/A')}") print(f" 详情链接: {item.get('detail_url', 'N/A')}") if item.get('pic_info'): pic_info = item['pic_info'] print(f" 图片信息: {pic_info.get('width', 'N/A')}x{pic_info.get('height', 'N/A')}") print(f" 图片链接: {pic_info.get('picUrl', 'N/A')}") print(f" 完整信息: {json.dumps(item, ensure_ascii=False, indent=2)}") print("\n" + "="*80) print("✅ 商品列表获取完成") print("="*80) # 自动保存商品信息到数据库 if items_list: saved_count = await self.save_items_list_to_db(items_list) logger.info(f"已将 {saved_count} 个商品信息保存到数据库") return { "success": True, "page_number": page_number, "page_size": page_size, "current_count": len(items_list), "items": items_list, "saved_count": saved_count if items_list else 0, "raw_data": items_data # 保留原始数据以备调试 } else: # 检查是否是token失效 error_msg = res_json.get('ret', [''])[0] if res_json.get('ret') else '' if 'FAIL_SYS_TOKEN_EXOIRED' in error_msg or 'token' in error_msg.lower(): logger.warning(f"Token失效,准备重试: {error_msg}") await asyncio.sleep(0.5) return await self.get_item_list_info(page_number, page_size, retry_count + 1) else: logger.error(f"获取商品信息失败: {res_json}") return {"error": f"获取商品信息失败: {error_msg}"} except Exception as e: logger.error(f"商品信息API请求异常: {self._safe_str(e)}") await asyncio.sleep(0.5) return await self.get_item_list_info(page_number, page_size, retry_count + 1) async def get_all_items(self, page_size=20, max_pages=None): """获取所有商品信息(自动分页) Args: page_size (int): 每页数量,默认20 max_pages (int): 最大页数限制,None表示无限制 Returns: dict: 包含所有商品信息的字典 """ all_items = [] page_number = 1 total_saved = 0 logger.info(f"开始获取所有商品信息,每页{page_size}条") while True: if max_pages and page_number > max_pages: logger.info(f"达到最大页数限制 {max_pages},停止获取") break logger.info(f"正在获取第 {page_number} 页...") result = await self.get_item_list_info(page_number, page_size) if not result.get("success"): logger.error(f"获取第 {page_number} 页失败: {result}") break current_items = result.get("items", []) if not current_items: logger.info(f"第 {page_number} 页没有数据,获取完成") break all_items.extend(current_items) total_saved += result.get("saved_count", 0) logger.info(f"第 {page_number} 页获取到 {len(current_items)} 个商品") # 如果当前页商品数量少于页面大小,说明已经是最后一页 if len(current_items) < page_size: logger.info(f"第 {page_number} 页商品数量({len(current_items)})少于页面大小({page_size}),获取完成") break page_number += 1 # 添加延迟避免请求过快 await asyncio.sleep(1) logger.info(f"所有商品获取完成,共 {len(all_items)} 个商品,保存了 {total_saved} 个") return { "success": True, "total_pages": page_number, "total_count": len(all_items), "total_saved": total_saved, "items": all_items } if __name__ == '__main__': cookies_str = os.getenv('COOKIES_STR') xianyuLive = XianyuLive(cookies_str) asyncio.run(xianyuLive.main())