diff --git a/XianyuAutoAsync.py b/XianyuAutoAsync.py index 09506c4..fd60754 100644 --- a/XianyuAutoAsync.py +++ b/XianyuAutoAsync.py @@ -12,14 +12,13 @@ from utils.xianyu_utils import ( ) from config import ( WEBSOCKET_URL, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, - TOKEN_REFRESH_INTERVAL, TOKEN_RETRY_INTERVAL, config, COOKIES_STR, + TOKEN_REFRESH_INTERVAL, TOKEN_RETRY_INTERVAL, COOKIES_STR, LOG_CONFIG, AUTO_REPLY, DEFAULT_HEADERS, WEBSOCKET_HEADERS, APP_CONFIG, API_ENDPOINTS ) -from utils.message_utils import format_message, format_system_message -from utils.ws_utils import WebSocketClient import sys import aiohttp +from collections import defaultdict class AutoReplyPauseManager: @@ -108,6 +107,18 @@ logger.add( ) class XianyuLive: + # 类级别的锁字典,为每个order_id维护一个锁(用于自动发货) + _order_locks = defaultdict(lambda: asyncio.Lock()) + # 记录锁的最后使用时间,用于清理 + _lock_usage_times = {} + # 记录锁的持有状态和释放时间 {lock_key: {'locked': bool, 'release_time': float, 'task': asyncio.Task}} + _lock_hold_info = {} + + # 独立的锁字典,用于订单详情获取(不使用延迟锁机制) + _order_detail_locks = defaultdict(lambda: asyncio.Lock()) + # 记录订单详情锁的使用时间 + _order_detail_lock_times = {} + def _safe_str(self, e): """安全地将异常转换为字符串""" try: @@ -172,6 +183,8 @@ class XianyuLive: self.confirmed_orders = {} # 记录已确认发货的订单,防止重复确认 self.order_confirm_cooldown = 600 # 10分钟内不重复确认同一订单 + # 自动发货已发送订单记录 + self.delivery_sent_orders = set() # 记录已发货的订单ID,防止重复发货 self.session = None # 用于API调用的aiohttp session @@ -205,12 +218,111 @@ class XianyuLive: return True def mark_delivery_sent(self, order_id: str): - """标记订单已发货 - 基于订单ID""" - if order_id: - self.last_delivery_time[order_id] = time.time() - logger.debug(f"【{self.cookie_id}】标记订单 {order_id} 已发货") - else: - logger.debug(f"【{self.cookie_id}】无订单ID,跳过发货标记") + """标记订单已发货""" + self.delivery_sent_orders.add(order_id) + logger.info(f"【{self.cookie_id}】订单 {order_id} 已标记为发货") + + async def _delayed_lock_release(self, lock_key: str, delay_minutes: int = 10): + """ + 延迟释放锁的异步任务 + + Args: + lock_key: 锁的键 + delay_minutes: 延迟时间(分钟),默认10分钟 + """ + try: + delay_seconds = delay_minutes * 60 + logger.info(f"【{self.cookie_id}】订单锁 {lock_key} 将在 {delay_minutes} 分钟后释放") + + # 等待指定时间 + await asyncio.sleep(delay_seconds) + + # 检查锁是否仍然存在且需要释放 + if lock_key in self._lock_hold_info: + lock_info = self._lock_hold_info[lock_key] + if lock_info.get('locked', False): + # 释放锁 + lock_info['locked'] = False + lock_info['release_time'] = time.time() + logger.info(f"【{self.cookie_id}】订单锁 {lock_key} 延迟释放完成") + + # 清理锁信息(可选,也可以保留用于统计) + # del self._lock_hold_info[lock_key] + + except asyncio.CancelledError: + logger.info(f"【{self.cookie_id}】订单锁 {lock_key} 延迟释放任务被取消") + except Exception as e: + logger.error(f"【{self.cookie_id}】订单锁 {lock_key} 延迟释放失败: {self._safe_str(e)}") + + def is_lock_held(self, lock_key: str) -> bool: + """ + 检查指定的锁是否仍在持有状态 + + Args: + lock_key: 锁的键 + + Returns: + bool: True表示锁仍在持有,False表示锁已释放或不存在 + """ + if lock_key not in self._lock_hold_info: + return False + + lock_info = self._lock_hold_info[lock_key] + return lock_info.get('locked', False) + + def cleanup_expired_locks(self, max_age_hours: int = 24): + """ + 清理过期的锁(包括自动发货锁和订单详情锁) + + Args: + max_age_hours: 锁的最大保留时间(小时),默认24小时 + """ + try: + current_time = time.time() + max_age_seconds = max_age_hours * 3600 + + # 清理自动发货锁 + expired_delivery_locks = [] + for order_id, last_used in self._lock_usage_times.items(): + if current_time - last_used > max_age_seconds: + expired_delivery_locks.append(order_id) + + # 清理过期的自动发货锁 + for order_id in expired_delivery_locks: + if order_id in self._order_locks: + del self._order_locks[order_id] + if order_id in self._lock_usage_times: + del self._lock_usage_times[order_id] + # 清理锁持有信息 + if order_id in self._lock_hold_info: + lock_info = self._lock_hold_info[order_id] + # 取消延迟释放任务 + if 'task' in lock_info and lock_info['task']: + lock_info['task'].cancel() + del self._lock_hold_info[order_id] + + # 清理订单详情锁 + expired_detail_locks = [] + for order_id, last_used in self._order_detail_lock_times.items(): + if current_time - last_used > max_age_seconds: + expired_detail_locks.append(order_id) + + # 清理过期的订单详情锁 + for order_id in expired_detail_locks: + if order_id in self._order_detail_locks: + del self._order_detail_locks[order_id] + if order_id in self._order_detail_lock_times: + del self._order_detail_lock_times[order_id] + + total_expired = len(expired_delivery_locks) + len(expired_detail_locks) + if total_expired > 0: + logger.info(f"【{self.cookie_id}】清理了 {total_expired} 个过期锁 (发货锁: {len(expired_delivery_locks)}, 详情锁: {len(expired_detail_locks)})") + logger.debug(f"【{self.cookie_id}】当前锁数量 - 发货锁: {len(self._order_locks)}, 详情锁: {len(self._order_detail_locks)}") + + except Exception as e: + logger.error(f"【{self.cookie_id}】清理过期锁时发生错误: {self._safe_str(e)}") + + def _is_auto_delivery_trigger(self, message: str) -> bool: """检查消息是否为自动发货触发关键字""" @@ -354,66 +466,109 @@ class XianyuLive: else: logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID') - # 检查是否可以进行自动发货(防重复)- 基于订单ID - if not self.can_auto_delivery(order_id): + # 使用订单ID作为锁的键,如果没有订单ID则使用item_id+chat_id组合 + lock_key = order_id if order_id else f"{item_id}_{chat_id}" + + # 第一重检查:延迟锁状态(在获取锁之前检查,避免不必要的等待) + if self.is_lock_held(lock_key): + logger.info(f'[{msg_time}] 【{self.cookie_id}】🔒【提前检查】订单 {lock_key} 延迟锁仍在持有状态,跳过发货') return - # 构造用户URL - user_url = f'https://www.goofish.com/personal?userId={send_user_id}' + # 第二重检查:基于时间的冷却机制 + if not self.can_auto_delivery(order_id): + logger.info(f'[{msg_time}] 【{self.cookie_id}】订单 {order_id} 在冷却期内,跳过发货') + return - # 自动发货逻辑 - try: - # 设置默认标题(将通过API获取真实商品信息) - item_title = "待获取商品信息" + # 获取或创建该订单的锁 + order_lock = self._order_locks[lock_key] - logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") + # 更新锁的使用时间 + self._lock_usage_times[lock_key] = time.time() - # 调用自动发货方法(包含自动确认发货) - delivery_content = await self._auto_delivery(item_id, item_title, order_id, send_user_id) + # 使用异步锁防止同一订单的并发处理 + async with order_lock: + logger.info(f'[{msg_time}] 【{self.cookie_id}】获取订单锁成功: {lock_key},开始处理自动发货') - if delivery_content: - # 标记已发货(防重复)- 基于订单ID - self.mark_delivery_sent(order_id) + # 第三重检查:获取锁后再次检查延迟锁状态(双重检查,防止在等待锁期间状态发生变化) + if self.is_lock_held(lock_key): + logger.info(f'[{msg_time}] 【{self.cookie_id}】订单 {lock_key} 在获取锁后检查发现延迟锁仍持有,跳过发货') + return - # 检查是否是图片发送标记 - if delivery_content.startswith("__IMAGE_SEND__"): - # 提取卡券ID和图片URL - image_data = delivery_content.replace("__IMAGE_SEND__", "") - if "|" in image_data: - card_id_str, image_url = image_data.split("|", 1) - try: - card_id = int(card_id_str) - except ValueError: - logger.error(f"无效的卡券ID: {card_id_str}") + # 第四重检查:获取锁后再次检查冷却状态 + if not self.can_auto_delivery(order_id): + logger.info(f'[{msg_time}] 【{self.cookie_id}】订单 {order_id} 在获取锁后检查发现仍在冷却期,跳过发货') + return + + # 构造用户URL + user_url = f'https://www.goofish.com/personal?userId={send_user_id}' + + # 自动发货逻辑 + try: + # 设置默认标题(将通过API获取真实商品信息) + item_title = "待获取商品信息" + + logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") + + # 调用自动发货方法(包含自动确认发货) + delivery_content = await self._auto_delivery(item_id, item_title, order_id, send_user_id) + + if delivery_content: + # 标记已发货(防重复)- 基于订单ID + self.mark_delivery_sent(order_id) + + # 标记锁为持有状态,并启动延迟释放任务 + self._lock_hold_info[lock_key] = { + 'locked': True, + 'lock_time': time.time(), + 'release_time': None, + 'task': None + } + + # 启动延迟释放锁的异步任务(10分钟后释放) + delay_task = asyncio.create_task(self._delayed_lock_release(lock_key, delay_minutes=10)) + self._lock_hold_info[lock_key]['task'] = delay_task + + # 检查是否是图片发送标记 + if delivery_content.startswith("__IMAGE_SEND__"): + # 提取卡券ID和图片URL + image_data = delivery_content.replace("__IMAGE_SEND__", "") + if "|" in image_data: + card_id_str, image_url = image_data.split("|", 1) + try: + card_id = int(card_id_str) + except ValueError: + logger.error(f"无效的卡券ID: {card_id_str}") + card_id = None + else: + # 兼容旧格式(没有卡券ID) card_id = None + image_url = image_data + + # 发送图片消息 + try: + await self.send_image_msg(websocket, chat_id, send_user_id, image_url, card_id=card_id) + logger.info(f'[{msg_time}] 【自动发货图片】已向 {user_url} 发送图片: {image_url}') + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") + except Exception as e: + logger.error(f"自动发货图片失败: {self._safe_str(e)}") + await self.send_msg(websocket, chat_id, send_user_id, "抱歉,图片发送失败,请联系客服。") + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "图片发送失败") else: - # 兼容旧格式(没有卡券ID) - card_id = None - image_url = image_data - - # 发送图片消息 - try: - await self.send_image_msg(websocket, chat_id, send_user_id, image_url, card_id=card_id) - logger.info(f'[{msg_time}] 【自动发货图片】已向 {user_url} 发送图片: {image_url}') + # 普通文本发货内容 + await self.send_msg(websocket, chat_id, send_user_id, delivery_content) + logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") - except Exception as e: - logger.error(f"自动发货图片失败: {self._safe_str(e)}") - await self.send_msg(websocket, chat_id, send_user_id, "抱歉,图片发送失败,请联系客服。") - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "图片发送失败") else: - # 普通文本发货内容 - await self.send_msg(websocket, chat_id, send_user_id, delivery_content) - logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") - else: - logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') - # 发送自动发货失败通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") + logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') + # 发送自动发货失败通知 + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") - except Exception as e: - logger.error(f"自动发货处理异常: {self._safe_str(e)}") - # 发送自动发货异常通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") + except Exception as e: + logger.error(f"自动发货处理异常: {self._safe_str(e)}") + # 发送自动发货异常通知 + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") + + logger.info(f'[{msg_time}] 【{self.cookie_id}】订单锁释放: {lock_key},自动发货处理完成') except Exception as e: logger.error(f"统一自动发货处理异常: {self._safe_str(e)}") @@ -492,6 +647,7 @@ class XianyuLive: return new_token logger.error(f"【{self.cookie_id}】Token刷新失败: {res_json}") + # 发送Token刷新失败通知 await self.send_token_refresh_notification(f"Token刷新失败: {res_json}", "token_refresh_failed") return None @@ -531,6 +687,40 @@ class XianyuLive: # 发送Cookie更新失败通知 await self.send_token_refresh_notification(f"Cookie更新失败: {str(e)}", "cookie_update_failed") + async def _restart_instance(self): + """重启XianyuLive实例""" + try: + logger.info(f"【{self.cookie_id}】Token刷新成功,准备重启实例...") + + # 导入CookieManager + from cookie_manager import manager as cookie_manager + + if cookie_manager: + # 通过CookieManager重启实例 + logger.info(f"【{self.cookie_id}】通过CookieManager重启实例...") + + # 使用异步方式调用update_cookie,避免阻塞 + def restart_task(): + try: + cookie_manager.update_cookie(self.cookie_id, self.cookies_str) + logger.info(f"【{self.cookie_id}】实例重启请求已发送") + except Exception as e: + logger.error(f"【{self.cookie_id}】重启实例失败: {e}") + + # 在后台执行重启任务 + import threading + restart_thread = threading.Thread(target=restart_task, daemon=True) + restart_thread.start() + + logger.info(f"【{self.cookie_id}】实例重启已在后台执行") + else: + logger.warning(f"【{self.cookie_id}】CookieManager不可用,无法重启实例") + + except Exception as e: + logger.error(f"【{self.cookie_id}】重启实例失败: {self._safe_str(e)}") + # 发送重启失败通知 + await self.send_token_refresh_notification(f"实例重启失败: {str(e)}", "instance_restart_failed") + async def save_item_info_to_db(self, item_id: str, item_detail: str = None, item_title: str = None): """保存商品信息到数据库 @@ -801,22 +991,6 @@ class XianyuLive: logger.error("获取商品信息失败,重试次数过多") return {"error": "获取商品信息失败,重试次数过多"} - # 如果是重试(retry_count > 0),强制刷新token - if retry_count > 0: - old_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' - logger.info(f"重试第{retry_count}次,强制刷新token... 当前_m_h5_tk: {old_token}") - await self.refresh_token() - new_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' - logger.info(f"重试刷新token完成,新的_m_h5_tk: {new_token}") - else: - # 确保使用最新的token(首次调用时的正常逻辑) - if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: - old_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' - logger.info(f"Token过期或不存在,刷新token... 当前_m_h5_tk: {old_token}") - await self.refresh_token() - new_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else '' - logger.info(f"Token刷新完成,新的_m_h5_tk: {new_token}") - # 确保session已创建 if not self.session: await self.create_session() @@ -844,8 +1018,6 @@ class XianyuLive: # 始终从最新的cookies中获取_m_h5_tk token(刷新后cookies会被更新) token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.warning(111) - logger.warning(token) if token: logger.debug(f"使用cookies中的_m_h5_tk token: {token}") else: @@ -1939,100 +2111,108 @@ class XianyuLive: return {"error": f"加密确认模块调用失败: {self._safe_str(e)}", "order_id": order_id} async def auto_freeshipping(self, order_id, item_id, buyer_id, retry_count=0): - """自动免拼发货 - 使用加密模块""" + """自动免拼发货 - 使用解密模块""" try: logger.debug(f"【{self.cookie_id}】开始免拼发货,订单ID: {order_id}") - # 导入超级混淆加密模块 - from secure_freeshipping_ultra import SecureFreeshipping + # 导入解密后的免拼发货模块 + from secure_freeshipping_decrypted import SecureFreeshipping - # 创建加密免拼发货实例 + # 创建免拼发货实例 secure_freeshipping = SecureFreeshipping(self.session, self.cookies_str, self.cookie_id) # 传递必要的属性 secure_freeshipping.current_token = self.current_token secure_freeshipping.last_token_refresh_time = self.last_token_refresh_time secure_freeshipping.token_refresh_interval = self.token_refresh_interval - secure_freeshipping.refresh_token = self.refresh_token # 传递refresh_token方法 - # 调用加密的免拼发货方法 + # 调用免拼发货方法 return await secure_freeshipping.auto_freeshipping(order_id, item_id, buyer_id, retry_count) except Exception as e: - logger.error(f"【{self.cookie_id}】加密免拼发货模块调用失败: {self._safe_str(e)}") - return {"error": f"加密免拼发货模块调用失败: {self._safe_str(e)}", "order_id": order_id} + logger.error(f"【{self.cookie_id}】免拼发货模块调用失败: {self._safe_str(e)}") + return {"error": f"免拼发货模块调用失败: {self._safe_str(e)}", "order_id": order_id} async def fetch_order_detail_info(self, order_id: str, item_id: str = None, buyer_id: str = None, debug_headless: bool = None): - """获取订单详情信息""" - try: - logger.info(f"【{self.cookie_id}】开始获取订单详情: {order_id}") + """获取订单详情信息(使用独立的锁机制,不受延迟锁影响)""" + # 使用独立的订单详情锁,不与自动发货锁冲突 + order_detail_lock = self._order_detail_locks[order_id] - # 导入订单详情获取器 - from utils.order_detail_fetcher import fetch_order_detail_simple - from db_manager import db_manager + # 记录订单详情锁的使用时间 + self._order_detail_lock_times[order_id] = time.time() - # 获取当前账号的cookie字符串 - cookie_string = self.cookies_str - logger.debug(f"【{self.cookie_id}】使用Cookie长度: {len(cookie_string) if cookie_string else 0}") + async with order_detail_lock: + logger.info(f"🔍 【{self.cookie_id}】获取订单详情锁 {order_id},开始处理...") + + try: + logger.info(f"【{self.cookie_id}】开始获取订单详情: {order_id}") - # 确定是否使用有头模式(调试用) - headless_mode = True if debug_headless is None else debug_headless - if not headless_mode: - logger.info(f"【{self.cookie_id}】🖥️ 启用有头模式进行调试") + # 导入订单详情获取器 + from utils.order_detail_fetcher import fetch_order_detail_simple + from db_manager import db_manager - # 异步获取订单详情(使用当前账号的cookie) - result = await fetch_order_detail_simple(order_id, cookie_string, headless=headless_mode) + # 获取当前账号的cookie字符串 + cookie_string = self.cookies_str + logger.debug(f"【{self.cookie_id}】使用Cookie长度: {len(cookie_string) if cookie_string else 0}") - if result: - logger.info(f"【{self.cookie_id}】订单详情获取成功: {order_id}") - logger.info(f"【{self.cookie_id}】页面标题: {result.get('title', '未知')}") + # 确定是否使用有头模式(调试用) + headless_mode = True if debug_headless is None else debug_headless + if not headless_mode: + logger.info(f"【{self.cookie_id}】🖥️ 启用有头模式进行调试") - # 获取解析后的规格信息 - spec_name = result.get('spec_name', '') - spec_value = result.get('spec_value', '') - quantity = result.get('quantity', '') - amount = result.get('amount', '') + # 异步获取订单详情(使用当前账号的cookie) + result = await fetch_order_detail_simple(order_id, cookie_string, headless=headless_mode) - if spec_name and spec_value: - logger.info(f"【{self.cookie_id}】📋 规格名称: {spec_name}") - logger.info(f"【{self.cookie_id}】📝 规格值: {spec_value}") - print(f"🛍️ 【{self.cookie_id}】订单 {order_id} 规格信息: {spec_name} -> {spec_value}") - else: - logger.warning(f"【{self.cookie_id}】未获取到有效的规格信息") - print(f"⚠️ 【{self.cookie_id}】订单 {order_id} 规格信息获取失败") + if result: + logger.info(f"【{self.cookie_id}】订单详情获取成功: {order_id}") + logger.info(f"【{self.cookie_id}】页面标题: {result.get('title', '未知')}") - # 插入或更新订单信息到数据库 - try: - success = db_manager.insert_or_update_order( - order_id=order_id, - item_id=item_id, - buyer_id=buyer_id, - spec_name=spec_name, - spec_value=spec_value, - quantity=quantity, - amount=amount, - order_status='processed', # 已处理状态 - cookie_id=self.cookie_id - ) + # 获取解析后的规格信息 + spec_name = result.get('spec_name', '') + spec_value = result.get('spec_value', '') + quantity = result.get('quantity', '') + amount = result.get('amount', '') - if success: - logger.info(f"【{self.cookie_id}】订单信息已保存到数据库: {order_id}") - print(f"💾 【{self.cookie_id}】订单 {order_id} 信息已保存到数据库") + if spec_name and spec_value: + logger.info(f"【{self.cookie_id}】📋 规格名称: {spec_name}") + logger.info(f"【{self.cookie_id}】📝 规格值: {spec_value}") + print(f"🛍️ 【{self.cookie_id}】订单 {order_id} 规格信息: {spec_name} -> {spec_value}") else: - logger.warning(f"【{self.cookie_id}】订单信息保存失败: {order_id}") + logger.warning(f"【{self.cookie_id}】未获取到有效的规格信息") + print(f"⚠️ 【{self.cookie_id}】订单 {order_id} 规格信息获取失败") - except Exception as db_e: - logger.error(f"【{self.cookie_id}】保存订单信息到数据库失败: {self._safe_str(db_e)}") + # 插入或更新订单信息到数据库 + try: + success = db_manager.insert_or_update_order( + order_id=order_id, + item_id=item_id, + buyer_id=buyer_id, + spec_name=spec_name, + spec_value=spec_value, + quantity=quantity, + amount=amount, + order_status='processed', # 已处理状态 + cookie_id=self.cookie_id + ) - return result - else: - logger.warning(f"【{self.cookie_id}】订单详情获取失败: {order_id}") + if success: + logger.info(f"【{self.cookie_id}】订单信息已保存到数据库: {order_id}") + print(f"💾 【{self.cookie_id}】订单 {order_id} 信息已保存到数据库") + else: + logger.warning(f"【{self.cookie_id}】订单信息保存失败: {order_id}") + + except Exception as db_e: + logger.error(f"【{self.cookie_id}】保存订单信息到数据库失败: {self._safe_str(db_e)}") + + return result + else: + logger.warning(f"【{self.cookie_id}】订单详情获取失败: {order_id}") + return None + + except Exception as e: + logger.error(f"【{self.cookie_id}】获取订单详情异常: {self._safe_str(e)}") return None - except Exception as e: - logger.error(f"【{self.cookie_id}】获取订单详情异常: {self._safe_str(e)}") - return None - async def _auto_delivery(self, item_id: str, item_title: str = None, order_id: str = None, send_user_id: str = None): """自动发货功能 - 获取卡券规则,执行延时,确认发货,发送内容""" try: @@ -2457,7 +2637,9 @@ class XianyuLive: logger.info("Token即将过期,准备刷新...") new_token = await self.refresh_token() if new_token: - logger.info(f"【{self.cookie_id}】Token刷新成功,准备重新建立连接...") + logger.info(f"【{self.cookie_id}】Token刷新成功,准备重启实例...") + # 注意:refresh_token方法中已经调用了_restart_instance() + # 这里只需要关闭当前连接,让main循环重新开始 self.connection_restart_flag = True if self.ws: await self.ws.close() @@ -2605,6 +2787,7 @@ class XianyuLive: } await ws.send(json.dumps(msg)) self.last_heartbeat_time = time.time() + logger.debug(f"【{self.cookie_id}】心跳包已发送") async def heartbeat_loop(self, ws): """心跳循环""" @@ -2634,22 +2817,25 @@ class XianyuLive: return False async def pause_cleanup_loop(self): - """定期清理过期的暂停记录""" + """定期清理过期的暂停记录和锁""" while True: try: # 检查账号是否启用 from cookie_manager import manager as cookie_manager if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id): - logger.info(f"【{self.cookie_id}】账号已禁用,停止暂停记录清理循环") + logger.info(f"【{self.cookie_id}】账号已禁用,停止清理循环") break # 清理过期的暂停记录 pause_manager.cleanup_expired_pauses() - + + # 清理过期的锁(每5分钟清理一次,保留24小时内的锁) + self.cleanup_expired_locks(max_age_hours=24) + # 每5分钟清理一次 await asyncio.sleep(300) except Exception as e: - logger.error(f"【{self.cookie_id}】暂停记录清理失败: {self._safe_str(e)}") + logger.error(f"【{self.cookie_id}】清理任务失败: {self._safe_str(e)}") await asyncio.sleep(300) # 出错后也等待5分钟再重试 async def send_msg_once(self, toid, item_id, text): @@ -3098,32 +3284,7 @@ class XianyuLive: - # 自动回复消息 - if not AUTO_REPLY.get('enabled', True): - logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】自动回复已禁用") - return - - # 检查该chat_id是否处于暂停状态 - if pause_manager.is_chat_paused(chat_id): - remaining_time = pause_manager.get_remaining_pause_time(chat_id) - remaining_minutes = remaining_time // 60 - remaining_seconds = remaining_time % 60 - logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】chat_id {chat_id} 自动回复已暂停,剩余时间: {remaining_minutes}分{remaining_seconds}秒") - return - - # 构造用户URL - user_url = f'https://www.goofish.com/personal?userId={send_user_id}' - - reply = None - # 判断是否启用API回复 - if AUTO_REPLY.get('api', {}).get('enabled', False): - reply = await self.get_api_reply( - msg_time, user_url, send_user_id, send_user_name, - item_id, send_message, chat_id - ) - if not reply: - logger.error(f"[{msg_time}] 【API调用失败】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {send_message}") - + # 【优先处理】检查系统消息和自动发货触发消息(不受人工接入暂停影响) if send_message == '[我已拍下,待付款]': logger.info(f'[{msg_time}] 【{self.cookie_id}】系统消息不处理') return @@ -3151,13 +3312,14 @@ class XianyuLive: elif send_message == '[你已发货]': logger.info(f'[{msg_time}] 【{self.cookie_id}】发货确认消息不处理') return - # 检查是否为自动发货触发消息 + # 【重要】检查是否为自动发货触发消息 - 即使在人工接入暂停期间也要处理 elif self._is_auto_delivery_trigger(send_message): + logger.info(f'[{msg_time}] 【{self.cookie_id}】检测到自动发货触发消息,即使在暂停期间也继续处理: {send_message}') # 使用统一的自动发货处理方法 await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id, item_id, chat_id, msg_time) return - + # 【重要】检查是否为"我已小刀,待刀成"卡片消息 - 即使在人工接入暂停期间也要处理 elif send_message == '[卡片消息]': # 检查是否为"我已小刀,待刀成"的卡片消息 try: @@ -3183,7 +3345,7 @@ class XianyuLive: # 检查是否为"我已小刀,待刀成" if card_title == "我已小刀,待刀成": - logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成",准备自动免拼发货') + logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成",即使在暂停期间也继续处理') # 提取订单ID order_id = self._extract_order_id(message) if order_id: @@ -3197,18 +3359,46 @@ class XianyuLive: else: logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 自动免拼发货失败: {result.get("error", "未知错误")}') await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id, - item_id, chat_id, msg_time) + item_id, chat_id, msg_time) return else: logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID,无法执行免拼发货') return else: logger.info(f'[{msg_time}] 【{self.cookie_id}】收到卡片消息,标题: {card_title or "未知"}') + # 如果不是目标卡片消息,继续正常处理流程(会受到暂停影响) except Exception as e: logger.error(f"处理卡片消息异常: {self._safe_str(e)}") + # 如果处理异常,继续正常处理流程(会受到暂停影响) + + # 自动回复消息 + if not AUTO_REPLY.get('enabled', True): + logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】自动回复已禁用") + return + + # 检查该chat_id是否处于暂停状态 + if pause_manager.is_chat_paused(chat_id): + remaining_time = pause_manager.get_remaining_pause_time(chat_id) + remaining_minutes = remaining_time // 60 + remaining_seconds = remaining_time % 60 + logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】chat_id {chat_id} 自动回复已暂停,剩余时间: {remaining_minutes}分{remaining_seconds}秒") + return + + # 构造用户URL + user_url = f'https://www.goofish.com/personal?userId={send_user_id}' + + reply = None + # 判断是否启用API回复 + if AUTO_REPLY.get('api', {}).get('enabled', False): + reply = await self.get_api_reply( + msg_time, user_url, send_user_id, send_user_name, + item_id, send_message, chat_id + ) + if not reply: + logger.error(f"[{msg_time}] 【API调用失败】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {send_message}") + - # 如果不是目标卡片消息,继续正常处理流程 # 记录回复来源 reply_source = 'API' # 默认假设是API回复 @@ -3316,8 +3506,11 @@ class XianyuLive: self.cleanup_task = asyncio.create_task(self.pause_cleanup_loop()) logger.info(f"【{self.cookie_id}】开始监听WebSocket消息...") + logger.info(f"【{self.cookie_id}】WebSocket连接状态正常,等待服务器消息...") + logger.info(f"【{self.cookie_id}】准备进入消息循环...") async for message in websocket: + logger.info(f"【{self.cookie_id}】收到WebSocket消息: {len(message) if message else 0} 字节") try: message_data = json.loads(message) @@ -3326,7 +3519,8 @@ class XianyuLive: continue # 处理其他消息 - await self.handle_message(message_data, websocket) + # 使用异步任务处理消息,防止阻塞后续消息接收 + asyncio.create_task(self.handle_message(message_data, websocket)) except Exception as e: logger.error(f"处理消息出错: {self._safe_str(e)}") @@ -3364,22 +3558,6 @@ class XianyuLive: logger.error("获取商品信息失败,重试次数过多") return {"error": "获取商品信息失败,重试次数过多"} - # 如果是重试(retry_count > 0),强制刷新token - if retry_count > 0: - old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"重试第{retry_count}次,强制刷新token... 当前_m_h5_tk: {old_token}") - await self.refresh_token() - new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"重试刷新token完成,新的_m_h5_tk: {new_token}") - else: - # 确保使用最新的token(首次调用时的正常逻辑) - if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: - old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"Token过期或不存在,刷新token... 当前_m_h5_tk: {old_token}") - await self.refresh_token() - new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"Token刷新完成,新的_m_h5_tk: {new_token}") - # 确保session已创建 if not self.session: await self.create_session() diff --git a/secure_confirm_decrypted.py b/secure_confirm_decrypted.py index 09bde9d..8d1490e 100644 --- a/secure_confirm_decrypted.py +++ b/secure_confirm_decrypted.py @@ -73,116 +73,6 @@ class SecureConfirm: logger.error(f"【{self.cookie_id}】获取真实商品ID失败: {self._safe_str(e)}") return None - async def refresh_token_by_detail_api(self, retry_count=0): - """通过商品详情API刷新token - 参照get_item_info方法""" - if retry_count >= 3: # 最多重试2次 - logger.error(f"【{self.cookie_id}】通过详情API刷新token失败,重试次数过多") - return False - - try: - # 优先使用传入的item_id,否则从数据库获取 - real_item_id = None - if hasattr(self, '_current_item_id') and self._current_item_id: - real_item_id = self._current_item_id - logger.debug(f"【{self.cookie_id}】使用传入的商品ID: {real_item_id}") - else: - # 从数据库中获取一个真实的商品ID来请求详情API - real_item_id = await self._get_real_item_id() - - if not real_item_id: - logger.warning(f"【{self.cookie_id}】无法获取真实商品ID,使用默认ID") - real_item_id = "123456789" # 备用ID - - params = { - 'jsv': '2.7.2', - 'appKey': '34839810', - 't': str(int(time.time()) * 1000), - 'sign': '', - 'v': '1.0', - 'type': 'originaljson', - 'accountSite': 'xianyu', - 'dataType': 'json', - 'timeout': '20000', - 'api': 'mtop.taobao.idle.pc.detail', - 'sessionOption': 'AutoLoginOnly', - } - - data_val = f'{{"itemId":"{real_item_id}"}}' - data = { - 'data': data_val, - } - - # 从当前cookies中获取token - token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - - if token: - logger.debug(f"【{self.cookie_id}】使用当前token刷新: {token}") - else: - logger.warning(f"【{self.cookie_id}】当前cookies中没有找到token") - - # 生成签名 - sign = generate_sign(params['t'], token, data_val) - params['sign'] = sign - - logger.info(f"【{self.cookie_id}】通过详情API刷新token,使用商品ID: {real_item_id}") - - async with self.session.post( - 'https://h5api.m.goofish.com/h5/mtop.taobao.idle.pc.detail/1.0/', - params=params, - data=data - ) as response: - # 检查并更新Cookie - if 'set-cookie' in response.headers: - new_cookies = {} - for cookie in response.headers.getall('set-cookie', []): - if '=' in cookie: - name, value = cookie.split(';')[0].split('=', 1) - new_cookies[name.strip()] = value.strip() - - # 更新cookies - if new_cookies: - self.cookies.update(new_cookies) - # 生成新的cookie字符串 - self.cookies_str = '; '.join([f"{k}={v}" for k, v in self.cookies.items()]) - # 更新数据库中的Cookie - await self._update_config_cookies() - logger.debug(f"【{self.cookie_id}】已通过详情API更新Cookie到数据库") - - # 获取新的token - new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - if new_token and new_token != token: - self.current_token = new_token - self.last_token_refresh_time = time.time() - logger.info(f"【{self.cookie_id}】通过详情API成功刷新token: {new_token}") - return True - else: - logger.debug(f"【{self.cookie_id}】详情API返回的token未变化") - - # 检查响应状态 - try: - res_json = await response.json() - if isinstance(res_json, dict): - ret_value = res_json.get('ret', []) - if any('SUCCESS::调用成功' in ret for ret in ret_value): - logger.debug(f"【{self.cookie_id}】详情API调用成功") - return True - else: - logger.warning(f"【{self.cookie_id}】详情API调用失败: {ret_value}") - if retry_count < 2: - await asyncio.sleep(0.5) - return await self.refresh_token_by_detail_api(retry_count + 1) - except: - logger.debug(f"【{self.cookie_id}】详情API响应解析失败,但可能已获取到新cookies") - - return bool(self.current_token) - - except Exception as e: - logger.error(f"【{self.cookie_id}】通过详情API刷新token异常: {self._safe_str(e)}") - if retry_count < 2: - await asyncio.sleep(0.5) - return await self.refresh_token_by_detail_api(retry_count + 1) - return False - async def _update_config_cookies(self): """更新数据库中的Cookie配置""" try: @@ -193,41 +83,6 @@ class SecureConfirm: except Exception as e: logger.error(f"【{self.cookie_id}】更新数据库Cookie失败: {self._safe_str(e)}") - async def refresh_token(self): - """刷新token - 优先使用详情API,失败时调用主界面类的方法""" - # 首先尝试通过详情API刷新token - success = await self.refresh_token_by_detail_api() - if success: - return self.current_token - - # 如果详情API失败,尝试调用主界面类的方法 - if self.main_instance and hasattr(self.main_instance, 'refresh_token'): - try: - logger.debug(f"【{self.cookie_id}】详情API刷新失败,调用主界面类的refresh_token方法") - new_token = await self.main_instance.refresh_token() - if new_token: - self.current_token = new_token - self.last_token_refresh_time = time.time() - # 更新本地的cookies_str - self.cookies_str = self.main_instance.cookies_str - # 重新解析cookies - self.cookies = {} - if self.cookies_str: - for cookie in self.cookies_str.split(';'): - if '=' in cookie: - key, value = cookie.strip().split('=', 1) - self.cookies[key] = value - logger.debug(f"【{self.cookie_id}】通过主界面类Token刷新成功,已同步cookies") - return new_token - else: - logger.warning(f"【{self.cookie_id}】主界面类Token刷新失败") - return None - except Exception as e: - logger.error(f"【{self.cookie_id}】调用主界面类refresh_token失败: {self._safe_str(e)}") - return None - else: - logger.warning(f"【{self.cookie_id}】主界面类实例不存在或没有refresh_token方法") - return None async def auto_confirm(self, order_id, item_id=None, retry_count=0): """自动确认发货 - 使用真实商品ID刷新token""" @@ -240,22 +95,6 @@ class SecureConfirm: self._current_item_id = item_id logger.debug(f"【{self.cookie_id}】设置当前商品ID: {item_id}") - # 如果是重试(retry_count > 0),强制刷新token - if retry_count > 0: - old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"【{self.cookie_id}】重试第{retry_count}次,强制刷新token... 当前_m_h5_tk: {old_token}") - await self.refresh_token() - new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"【{self.cookie_id}】重试刷新token完成,新的_m_h5_tk: {new_token}") - else: - # 确保使用最新的token(首次调用时的正常逻辑) - if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: - old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"【{self.cookie_id}】Token过期或不存在,刷新token... 当前_m_h5_tk: {old_token}") - await self.refresh_token() - new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' - logger.info(f"【{self.cookie_id}】Token刷新完成,新的_m_h5_tk: {new_token}") - # 确保session已创建 if not self.session: raise Exception("Session未创建") diff --git a/secure_freeshipping_decrypted.py b/secure_freeshipping_decrypted.py new file mode 100644 index 0000000..d8e21f9 --- /dev/null +++ b/secure_freeshipping_decrypted.py @@ -0,0 +1,134 @@ +import asyncio +import time +from loguru import logger +from utils.xianyu_utils import trans_cookies, generate_sign + + +class SecureFreeshipping: + def __init__(self, session, cookies_str, cookie_id): + self.session = session + self.cookies_str = cookies_str + self.cookie_id = cookie_id + self.cookies = trans_cookies(cookies_str) if cookies_str else {} + + # 这些属性将由主类传递 + self.current_token = None + self.last_token_refresh_time = None + self.token_refresh_interval = None + + def _safe_str(self, obj): + """安全转换为字符串""" + try: + return str(obj) + except: + return "无法转换的对象" + + async def update_config_cookies(self): + """更新数据库中的cookies""" + try: + from db_manager import db_manager + + # 更新数据库中的Cookie + db_manager.update_config_cookies(self.cookie_id, self.cookies_str) + logger.debug(f"【{self.cookie_id}】Cookie已更新到数据库") + + except Exception as e: + logger.error(f"【{self.cookie_id}】更新Cookie到数据库失败: {self._safe_str(e)}") + + async def auto_freeshipping(self, order_id, item_id, buyer_id, retry_count=0): + """自动免拼发货 - 加密版本""" + if retry_count >= 4: # 最多重试3次 + logger.error("免拼发货发货失败,重试次数过多") + return {"error": "免拼发货发货失败,重试次数过多"} + + # 确保session已创建 + if not self.session: + raise Exception("Session未创建") + + params = { + 'jsv': '2.7.2', + 'appKey': '34839810', + 't': str(int(time.time()) * 1000), + 'sign': '', + 'v': '1.0', + 'type': 'originaljson', + 'accountSite': 'xianyu', + 'dataType': 'json', + 'timeout': '20000', + 'api': 'mtop.idle.groupon.activity.seller.freeshipping', + 'sessionOption': 'AutoLoginOnly', + } + + data_val = '{"bizOrderId":"' + order_id + '", "itemId":' + item_id + ',"buyerId":' + buyer_id + '}' + data = { + 'data': data_val, + } + + # 打印参数信息 + logger.info(f"【{self.cookie_id}】免拼发货请求参数: data_val = {data_val}") + logger.info(f"【{self.cookie_id}】参数详情 - order_id: {order_id}, item_id: {item_id}, buyer_id: {buyer_id}") + + # 始终从最新的cookies中获取_m_h5_tk token(刷新后cookies会被更新) + token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' + + if token: + logger.info(f"使用cookies中的_m_h5_tk token: {token}") + else: + logger.warning("cookies中没有找到_m_h5_tk token") + + sign = generate_sign(params['t'], token, data_val) + params['sign'] = sign + + try: + logger.info(f"【{self.cookie_id}】开始自动免拼发货,订单ID: {order_id}") + async with self.session.post( + 'https://h5api.m.goofish.com/h5/mtop.idle.groupon.activity.seller.freeshipping/1.0/', + params=params, + data=data + ) as response: + res_json = await response.json() + + # 检查并更新Cookie + if 'set-cookie' in response.headers: + new_cookies = {} + for cookie in response.headers.getall('set-cookie', []): + if '=' in cookie: + name, value = cookie.split(';')[0].split('=', 1) + new_cookies[name.strip()] = value.strip() + + # 更新cookies + if new_cookies: + self.cookies.update(new_cookies) + # 生成新的cookie字符串 + self.cookies_str = '; '.join([f"{k}={v}" for k, v in self.cookies.items()]) + # 更新数据库中的Cookie + await self.update_config_cookies() + logger.debug("已更新Cookie到数据库") + + logger.info(f"【{self.cookie_id}】自动免拼发货响应: {res_json}") + + # 检查响应结果 + if res_json.get('ret') and res_json['ret'][0] == 'SUCCESS::调用成功': + logger.info(f"【{self.cookie_id}】✅ 自动免拼发货成功,订单ID: {order_id}") + return {"success": True, "order_id": order_id} + else: + error_msg = res_json.get('ret', ['未知错误'])[0] if res_json.get('ret') else '未知错误' + logger.warning(f"【{self.cookie_id}】❌ 自动免拼发货失败: {error_msg}") + + # 如果是token相关错误,进行重试 + if 'token' in error_msg.lower() or 'sign' in error_msg.lower(): + logger.info(f"【{self.cookie_id}】检测到token错误,准备重试...") + return await self.auto_freeshipping(order_id, item_id, buyer_id, retry_count + 1) + + return {"error": error_msg, "order_id": order_id} + + except Exception as e: + logger.error(f"【{self.cookie_id}】自动免拼发货API请求异常: {self._safe_str(e)}") + await asyncio.sleep(0.5) + + # 网络异常也进行重试 + if retry_count < 2: + logger.info(f"【{self.cookie_id}】网络异常,准备重试...") + return await self.auto_freeshipping(order_id, item_id, buyer_id, retry_count + 1) + + return {"error": f"网络异常: {self._safe_str(e)}", "order_id": order_id}