From 9d49ac0adf21d9b5c22cb98d4aab6ac1c23dd39d Mon Sep 17 00:00:00 2001 From: zhinianboke <115088296+zhinianboke@users.noreply.github.com> Date: Fri, 25 Jul 2025 11:07:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=80=9A=E7=9F=A5=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- XianyuAutoAsync.py | 69 ++++++++-- test_duplicate_notification_fix.py | 168 ++++++++++++++++++++++++ test_notification_deduplication.py | 183 ++++++++++++++++++++++++++ test_simple_token_filter.py | 169 ++++++++++++++++++++++++ test_token_expiry_filter.py | 203 +++++++++++++++++++++++++++++ 5 files changed, 781 insertions(+), 11 deletions(-) create mode 100644 test_duplicate_notification_fix.py create mode 100644 test_notification_deduplication.py create mode 100644 test_simple_token_filter.py create mode 100644 test_token_expiry_filter.py diff --git a/XianyuAutoAsync.py b/XianyuAutoAsync.py index ede6a00..a2b0ea8 100644 --- a/XianyuAutoAsync.py +++ b/XianyuAutoAsync.py @@ -82,6 +82,10 @@ class XianyuLive: self.current_token = None self.token_refresh_task = None self.connection_restart_flag = False # 连接重启标志 + + # 通知防重复机制 + self.last_notification_time = {} # 记录每种通知类型的最后发送时间 + self.notification_cooldown = 300 # 5分钟内不重复发送相同类型的通知 # 人工接管功能已禁用,永远走自动模式 # self.manual_mode_conversations = set() # 存储处于人工接管模式的会话ID @@ -198,13 +202,13 @@ class XianyuLive: logger.error(f"Token刷新失败: {res_json}") # 发送Token刷新失败通知 - await self.send_token_refresh_notification(f"Token刷新失败: {res_json}") + await self.send_token_refresh_notification(f"Token刷新失败: {res_json}", "token_refresh_failed") return None except Exception as e: logger.error(f"Token刷新异常: {self._safe_str(e)}") # 发送Token刷新异常通知 - await self.send_token_refresh_notification(f"Token刷新异常: {str(e)}") + await self.send_token_refresh_notification(f"Token刷新异常: {str(e)}", "token_refresh_exception") return None async def update_config_cookies(self): @@ -220,16 +224,16 @@ class XianyuLive: except Exception as e: logger.error(f"更新数据库Cookie失败: {self._safe_str(e)}") # 发送数据库更新失败通知 - await self.send_token_refresh_notification(f"数据库Cookie更新失败: {str(e)}") + await self.send_token_refresh_notification(f"数据库Cookie更新失败: {str(e)}", "db_update_failed") else: logger.warning("Cookie ID不存在,无法更新数据库") # 发送Cookie ID缺失通知 - await self.send_token_refresh_notification("Cookie ID不存在,无法更新数据库") + await self.send_token_refresh_notification("Cookie ID不存在,无法更新数据库", "cookie_id_missing") except Exception as e: logger.error(f"更新Cookie失败: {self._safe_str(e)}") # 发送Cookie更新失败通知 - await self.send_token_refresh_notification(f"Cookie更新失败: {str(e)}") + await self.send_token_refresh_notification(f"Cookie更新失败: {str(e)}", "cookie_update_failed") async def save_item_info_to_db(self, item_id: str, item_detail: str = None): """保存商品信息到数据库 @@ -895,9 +899,22 @@ class XianyuLive: except Exception as e: logger.error(f"发送QQ通知异常: {self._safe_str(e)}") - async def send_token_refresh_notification(self, error_message: str): - """发送Token刷新异常通知""" + async def send_token_refresh_notification(self, error_message: str, notification_type: str = "token_refresh"): + """发送Token刷新异常通知(带防重复机制)""" try: + # 检查是否是正常的令牌过期,这种情况不需要发送通知 + if self._is_normal_token_expiry(error_message): + logger.debug(f"检测到正常的令牌过期,跳过通知: {error_message}") + return + + # 检查是否在冷却期内 + current_time = time.time() + last_time = self.last_notification_time.get(notification_type, 0) + + if current_time - last_time < self.notification_cooldown: + logger.debug(f"通知在冷却期内,跳过发送: {notification_type} (距离上次 {int(current_time - last_time)} 秒)") + return + from db_manager import db_manager # 获取当前账号的通知配置 @@ -919,6 +936,7 @@ class XianyuLive: logger.info(f"准备发送Token刷新异常通知: {self.cookie_id}") # 发送通知到各个渠道 + notification_sent = False for notification in notifications: if not notification.get('enabled', True): continue @@ -929,15 +947,39 @@ class XianyuLive: try: if channel_type == 'qq': await self._send_qq_notification(channel_config, notification_msg) + notification_sent = True else: logger.warning(f"不支持的通知渠道类型: {channel_type}") except Exception as notify_error: logger.error(f"发送Token刷新通知失败 ({notification.get('channel_name', 'Unknown')}): {self._safe_str(notify_error)}") + # 如果成功发送了通知,更新最后发送时间 + if notification_sent: + self.last_notification_time[notification_type] = current_time + logger.info(f"Token刷新通知已发送,下次可发送时间: {time.strftime('%H:%M:%S', time.localtime(current_time + self.notification_cooldown))}") + except Exception as e: logger.error(f"处理Token刷新通知失败: {self._safe_str(e)}") + def _is_normal_token_expiry(self, error_message: str) -> bool: + """检查是否是正常的令牌过期(这种情况不需要发送通知)""" + # 正常的令牌过期关键词 + normal_expiry_keywords = [ + 'FAIL_SYS_TOKEN_EXOIRED::令牌过期', + 'FAIL_SYS_TOKEN_EXPIRED::令牌过期', + 'FAIL_SYS_TOKEN_EXOIRED', + 'FAIL_SYS_TOKEN_EXPIRED', + '令牌过期' + ] + + # 检查错误消息是否包含正常的令牌过期关键词 + for keyword in normal_expiry_keywords: + if keyword in error_message: + return True + + return False + async def send_delivery_failure_notification(self, send_user_name: str, send_user_id: str, item_id: str, error_message: str): """发送自动发货失败通知""" try: @@ -1234,7 +1276,7 @@ class XianyuLive: else: logger.error("Token刷新失败,将在{}分钟后重试".format(self.token_retry_interval // 60)) # 发送Token刷新失败通知 - await self.send_token_refresh_notification("Token定时刷新失败,将自动重试") + await self.send_token_refresh_notification("Token定时刷新失败,将自动重试", "token_scheduled_refresh_failed") await asyncio.sleep(self.token_retry_interval) continue await asyncio.sleep(60) @@ -1313,14 +1355,19 @@ class XianyuLive: async def init(self, ws): # 如果没有token或者token过期,获取新token + token_refresh_attempted = False if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: logger.info("获取初始token...") + token_refresh_attempted = True await self.refresh_token() - + if not self.current_token: logger.error("无法获取有效token,初始化失败") - # 发送Token获取失败通知 - await self.send_token_refresh_notification("初始化时无法获取有效Token") + # 只有在没有尝试刷新token的情况下才发送通知,避免与refresh_token中的通知重复 + if not token_refresh_attempted: + await self.send_token_refresh_notification("初始化时无法获取有效Token", "token_init_failed") + else: + logger.info("由于刚刚尝试过token刷新,跳过重复的初始化失败通知") raise Exception("Token获取失败") msg = { diff --git a/test_duplicate_notification_fix.py b/test_duplicate_notification_fix.py new file mode 100644 index 0000000..74231e8 --- /dev/null +++ b/test_duplicate_notification_fix.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +""" +测试重复通知修复 +验证Token刷新失败时不会发送重复通知 +""" + +import asyncio +import time +from unittest.mock import AsyncMock, patch, MagicMock +import sys +import os + +# 添加项目根目录到路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +async def test_duplicate_notification_fix(): + """测试重复通知修复""" + print("🧪 测试Token刷新失败重复通知修复") + print("=" * 60) + + # 动态导入,避免配置问题 + try: + from XianyuAutoAsync import XianyuLive + print("✅ 成功导入 XianyuLive") + except Exception as e: + print(f"❌ 导入失败: {e}") + return + + # 创建测试用的XianyuLive实例 + test_cookies = "unb=test123; _m_h5_tk=test_token_123456789" + + try: + xianyu = XianyuLive(test_cookies, "test_account") + print("✅ XianyuLive 实例创建成功") + except Exception as e: + print(f"❌ 创建 XianyuLive 实例失败: {e}") + return + + # Mock外部依赖 + with patch('XianyuAutoAsync.db_manager') as mock_db, \ + patch('aiohttp.ClientSession') as mock_session: + + # 配置数据库mock + mock_db.get_account_notifications.return_value = [ + { + 'enabled': True, + 'channel_type': 'qq', + 'channel_name': 'Test QQ', + 'channel_config': {'qq_number': '123456', 'api_url': 'http://test.com'} + } + ] + + # Mock HTTP session + mock_response = MagicMock() + mock_response.status = 200 + mock_response.text = AsyncMock(return_value='{"ret": ["FAIL_SYS_SESSION_EXPIRED::Session过期"]}') + mock_session_instance = AsyncMock() + mock_session_instance.post.return_value.__aenter__.return_value = mock_response + mock_session.return_value = mock_session_instance + xianyu.session = mock_session_instance + + # Mock QQ通知发送方法 + xianyu._send_qq_notification = AsyncMock() + + print("\n📋 测试场景: Cookie过期导致Token刷新失败") + print("-" * 40) + + # 重置状态 + xianyu.current_token = None + xianyu.last_token_refresh_time = 0 + xianyu._send_qq_notification.reset_mock() + + print("1️⃣ 模拟 init() 方法调用...") + + # 创建一个mock websocket + mock_ws = MagicMock() + + try: + # 调用init方法,这会触发refresh_token,然后检查token + await xianyu.init(mock_ws) + except Exception as e: + print(f" 预期的异常: {e}") + + # 检查通知发送次数 + call_count = xianyu._send_qq_notification.call_count + print(f"\n📊 通知发送统计:") + print(f" 总调用次数: {call_count}") + + if call_count == 1: + print(" ✅ 成功!只发送了一次通知") + print(" 💡 说明: refresh_token失败后,init不会发送重复通知") + elif call_count == 2: + print(" ❌ 失败!发送了两次重复通知") + print(" 🔧 需要进一步优化防重复机制") + elif call_count == 0: + print(" ⚠️ 没有发送通知(可能是mock配置问题)") + else: + print(f" ❓ 异常的调用次数: {call_count}") + + # 显示调用详情 + if xianyu._send_qq_notification.call_args_list: + print(f"\n📝 通知调用详情:") + for i, call in enumerate(xianyu._send_qq_notification.call_args_list, 1): + args, kwargs = call + if len(args) >= 2: + message = args[1] + # 提取关键信息 + if "异常信息:" in message: + error_info = message.split("异常信息:")[1].split("\n")[0].strip() + print(f" 第{i}次: {error_info}") + + print("\n🔍 防重复机制分析:") + print(" • 方案1: 时间冷却期 - 5分钟内不重复发送相同类型通知") + print(" • 方案2: 逻辑判断 - init()检查是否刚刚尝试过refresh_token") + print(" • 当前使用: 方案2 (更精确,避免逻辑重复)") + + print(f"\n⏰ 通知时间记录:") + for notification_type, last_time in xianyu.last_notification_time.items(): + print(f" {notification_type}: {time.strftime('%H:%M:%S', time.localtime(last_time))}") + +def show_optimization_summary(): + """显示优化总结""" + print("\n\n📋 优化总结") + print("=" * 60) + + print("🎯 问题描述:") + print(" 用户反馈每次Token刷新异常都会收到两个相同的通知") + + print("\n🔍 问题根因:") + print(" 1. refresh_token() 失败时发送第一次通知") + print(" 2. init() 检查 current_token 为空时发送第二次通知") + print(" 3. 两次通知内容基本相同,造成用户困扰") + + print("\n🛠️ 解决方案:") + print(" 方案A: 添加通知防重复机制") + print(" • 为不同场景使用不同的通知类型") + print(" • 设置5分钟冷却期,避免短时间重复通知") + print(" • 保留详细的错误信息用于调试") + + print("\n 方案B: 优化逻辑判断") + print(" • 在 init() 中跟踪是否刚刚尝试过 refresh_token") + print(" • 如果刚刚尝试过且失败,则不发送重复通知") + print(" • 更精确地避免逻辑重复") + + print("\n✅ 实施的优化:") + print(" • 采用方案A + 方案B的组合") + print(" • 添加了通知防重复机制(时间冷却)") + print(" • 优化了 init() 方法的逻辑判断") + print(" • 为不同错误场景使用不同的通知类型") + + print("\n🎉 预期效果:") + print(" • 用户只会收到一次Token刷新异常通知") + print(" • 通知内容更加精确,便于问题定位") + print(" • 避免了通知轰炸,改善用户体验") + print(" • 保留了完整的错误信息用于调试") + +if __name__ == "__main__": + try: + asyncio.run(test_duplicate_notification_fix()) + show_optimization_summary() + + print("\n" + "=" * 60) + print("🎊 Token刷新重复通知修复测试完成!") + + except Exception as e: + print(f"❌ 测试过程中发生错误: {e}") + import traceback + traceback.print_exc() diff --git a/test_notification_deduplication.py b/test_notification_deduplication.py new file mode 100644 index 0000000..484eee5 --- /dev/null +++ b/test_notification_deduplication.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +""" +测试通知防重复机制 +验证Token刷新异常通知不会重复发送 +""" + +import asyncio +import time +from unittest.mock import AsyncMock, patch, MagicMock +from XianyuAutoAsync import XianyuLive + +async def test_notification_deduplication(): + """测试通知防重复机制""" + print("🧪 测试通知防重复机制") + print("=" * 50) + + # 创建测试用的XianyuLive实例 + test_cookies = "unb=test123; _m_h5_tk=test_token_123456789" + + try: + xianyu = XianyuLive(test_cookies, "test_account") + print("✅ XianyuLive 实例创建成功") + except Exception as e: + print(f"❌ 创建 XianyuLive 实例失败: {e}") + return + + # Mock数据库和通知方法 + with patch('XianyuAutoAsync.db_manager') as mock_db: + # 配置mock返回值 + mock_db.get_account_notifications.return_value = [ + { + 'enabled': True, + 'channel_type': 'qq', + 'channel_name': 'Test QQ', + 'channel_config': {'qq_number': '123456', 'api_url': 'http://test.com'} + } + ] + + # Mock QQ通知发送方法 + xianyu._send_qq_notification = AsyncMock() + + print("\n1️⃣ 测试首次发送通知...") + + # 第一次发送通知 + start_time = time.time() + await xianyu.send_token_refresh_notification("Token刷新失败: Session过期", "token_refresh_failed") + + # 验证通知是否发送 + if xianyu._send_qq_notification.called: + print("✅ 首次通知发送成功") + print(f" 发送时间: {time.strftime('%H:%M:%S', time.localtime(start_time))}") + else: + print("❌ 首次通知发送失败") + return + + print("\n2️⃣ 测试冷却期内重复发送...") + + # 重置mock调用计数 + xianyu._send_qq_notification.reset_mock() + + # 立即再次发送相同类型的通知 + await xianyu.send_token_refresh_notification("Token刷新失败: Session过期", "token_refresh_failed") + + # 验证通知是否被阻止 + if not xianyu._send_qq_notification.called: + print("✅ 冷却期内的重复通知被正确阻止") + cooldown_end = start_time + xianyu.notification_cooldown + print(f" 冷却期结束时间: {time.strftime('%H:%M:%S', time.localtime(cooldown_end))}") + else: + print("❌ 冷却期内的重复通知未被阻止") + + print("\n3️⃣ 测试不同类型的通知...") + + # 重置mock调用计数 + xianyu._send_qq_notification.reset_mock() + + # 发送不同类型的通知 + await xianyu.send_token_refresh_notification("初始化时无法获取有效Token", "token_init_failed") + + # 验证不同类型的通知是否正常发送 + if xianyu._send_qq_notification.called: + print("✅ 不同类型的通知正常发送") + else: + print("❌ 不同类型的通知发送失败") + + print("\n4️⃣ 测试通知类型统计...") + + # 显示当前的通知时间记录 + print(" 当前通知时间记录:") + for notification_type, last_time in xianyu.last_notification_time.items(): + print(f" {notification_type}: {time.strftime('%H:%M:%S', time.localtime(last_time))}") + + print(f" 通知冷却时间: {xianyu.notification_cooldown} 秒 ({xianyu.notification_cooldown // 60} 分钟)") + + print("\n5️⃣ 测试模拟真实场景...") + + # 模拟真实的Token刷新失败场景 + print(" 模拟场景: refresh_token() 失败 + init() 检查失败") + + # 重置mock和时间记录 + xianyu._send_qq_notification.reset_mock() + xianyu.last_notification_time.clear() + + # 模拟refresh_token失败 + await xianyu.send_token_refresh_notification("Token刷新失败: {'ret': ['FAIL_SYS_SESSION_EXPIRED::Session过期']}", "token_refresh_failed") + first_call_count = xianyu._send_qq_notification.call_count + + # 模拟init检查失败(这应该被阻止,因为是相同的根本原因) + await xianyu.send_token_refresh_notification("初始化时无法获取有效Token", "token_init_failed") + second_call_count = xianyu._send_qq_notification.call_count + + print(f" refresh_token 通知调用次数: {first_call_count}") + print(f" init 通知调用次数: {second_call_count - first_call_count}") + print(f" 总调用次数: {second_call_count}") + + if second_call_count == 2: + print("✅ 不同阶段的通知都正常发送(因为使用了不同的通知类型)") + elif second_call_count == 1: + print("⚠️ 只发送了一次通知(可能需要调整策略)") + else: + print(f"❌ 异常的调用次数: {second_call_count}") + +def test_notification_types(): + """测试通知类型分类""" + print("\n\n📋 通知类型分类说明") + print("=" * 50) + + notification_types = { + "token_refresh_failed": "Token刷新API调用失败", + "token_refresh_exception": "Token刷新过程中发生异常", + "token_init_failed": "初始化时无法获取有效Token", + "token_scheduled_refresh_failed": "定时Token刷新失败", + "db_update_failed": "数据库Cookie更新失败", + "cookie_id_missing": "Cookie ID不存在", + "cookie_update_failed": "Cookie更新失败" + } + + print("🏷️ 通知类型及其含义:") + for type_name, description in notification_types.items(): + print(f" • {type_name:<30} : {description}") + + print(f"\n⏰ 防重复机制:") + print(f" • 冷却时间: 5分钟 (300秒)") + print(f" • 相同类型的通知在冷却期内不会重复发送") + print(f" • 不同类型的通知可以正常发送") + print(f" • 成功发送后才会更新冷却时间") + +async def test_real_scenario_simulation(): + """测试真实场景模拟""" + print("\n\n🎭 真实场景模拟") + print("=" * 50) + + print("📋 场景描述:") + print(" 1. 用户的Cookie过期") + print(" 2. refresh_token() 调用失败,返回 Session过期") + print(" 3. init() 检查 current_token 为空,也发送通知") + print(" 4. 期望结果: 只收到一次通知,而不是两次") + + print("\n🔧 解决方案:") + print(" • 为不同阶段使用不同的通知类型") + print(" • token_refresh_failed: refresh_token API失败") + print(" • token_init_failed: 初始化检查失败") + print(" • 这样可以区分问题发生的具体阶段") + print(" • 但仍然避免短时间内的重复通知") + +if __name__ == "__main__": + try: + asyncio.run(test_notification_deduplication()) + test_notification_types() + asyncio.run(test_real_scenario_simulation()) + + print("\n" + "=" * 50) + print("🎉 通知防重复机制测试完成!") + print("\n💡 优化效果:") + print(" ✅ 避免了短时间内的重复通知") + print(" ✅ 保留了不同阶段的错误信息") + print(" ✅ 提供了5分钟的冷却期") + print(" ✅ 用户体验得到改善") + + except Exception as e: + print(f"❌ 测试过程中发生错误: {e}") + import traceback + traceback.print_exc() diff --git a/test_simple_token_filter.py b/test_simple_token_filter.py new file mode 100644 index 0000000..2ea3dfb --- /dev/null +++ b/test_simple_token_filter.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +""" +简单测试令牌过期过滤逻辑 +""" + +import sys +import os + +# 添加项目根目录到路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +def test_token_expiry_filter_logic(): + """测试令牌过期过滤逻辑""" + print("🧪 测试令牌过期过滤逻辑") + print("=" * 50) + + # 直接测试过滤逻辑,不依赖完整的XianyuLive实例 + def _is_normal_token_expiry(error_message: str) -> bool: + """检查是否是正常的令牌过期(这种情况不需要发送通知)""" + # 正常的令牌过期关键词 + normal_expiry_keywords = [ + 'FAIL_SYS_TOKEN_EXOIRED::令牌过期', + 'FAIL_SYS_TOKEN_EXPIRED::令牌过期', + 'FAIL_SYS_TOKEN_EXOIRED', + 'FAIL_SYS_TOKEN_EXPIRED', + '令牌过期' + ] + + # 检查错误消息是否包含正常的令牌过期关键词 + for keyword in normal_expiry_keywords: + if keyword in error_message: + return True + + return False + + # 测试用例 + test_cases = [ + # 应该被过滤的消息(返回True) + ("Token刷新失败: {'ret': ['FAIL_SYS_TOKEN_EXOIRED::令牌过期']}", True, "标准令牌过期"), + ("Token刷新失败: {'ret': ['FAIL_SYS_TOKEN_EXPIRED::令牌过期']}", True, "标准令牌过期(EXPIRED)"), + ("Token刷新异常: FAIL_SYS_TOKEN_EXOIRED", True, "简单令牌过期"), + ("Token刷新异常: FAIL_SYS_TOKEN_EXPIRED", True, "简单令牌过期(EXPIRED)"), + ("Token刷新失败: 令牌过期", True, "中文令牌过期"), + ("其他错误信息包含FAIL_SYS_TOKEN_EXOIRED的情况", True, "包含关键词"), + + # 不应该被过滤的消息(返回False) + ("Token刷新失败: {'ret': ['FAIL_SYS_SESSION_EXPIRED::Session过期']}", False, "Session过期"), + ("Token刷新异常: 网络连接超时", False, "网络异常"), + ("Token刷新失败: Cookie无效", False, "Cookie问题"), + ("初始化时无法获取有效Token", False, "初始化失败"), + ("Token刷新失败: 未知错误", False, "未知错误"), + ("Token刷新失败: API调用失败", False, "API失败"), + ("", False, "空消息"), + ] + + print("📋 测试用例:") + print("-" * 50) + + passed = 0 + total = len(test_cases) + + for i, (message, expected, description) in enumerate(test_cases, 1): + result = _is_normal_token_expiry(message) + + if result == expected: + status = "✅ 通过" + passed += 1 + else: + status = "❌ 失败" + + filter_action = "过滤" if result else "不过滤" + expected_action = "过滤" if expected else "不过滤" + + print(f"{i:2d}. {status} {description}") + print(f" 消息: {message[:60]}{'...' if len(message) > 60 else ''}") + print(f" 结果: {filter_action} | 期望: {expected_action}") + print() + + print("=" * 50) + print(f"📊 测试结果: {passed}/{total} 通过") + + if passed == total: + print("🎉 所有测试通过!过滤逻辑工作正常") + return True + else: + print("⚠️ 部分测试失败,需要检查过滤逻辑") + return False + +def show_real_world_examples(): + """显示真实世界的例子""" + print("\n\n📋 真实场景示例") + print("=" * 50) + + print("🚫 以下情况将不再发送通知(被过滤):") + examples_filtered = [ + "Token刷新失败: {'api': 'mtop.taobao.idlemessage.pc.login.token', 'data': {}, 'ret': ['FAIL_SYS_TOKEN_EXOIRED::令牌过期'], 'v': '1.0'}", + "Token刷新异常: FAIL_SYS_TOKEN_EXPIRED", + "Token刷新失败: 令牌过期" + ] + + for i, example in enumerate(examples_filtered, 1): + print(f"{i}. {example}") + + print("\n✅ 以下情况仍会发送通知(不被过滤):") + examples_not_filtered = [ + "Token刷新失败: {'api': 'mtop.taobao.idlemessage.pc.login.token', 'data': {}, 'ret': ['FAIL_SYS_SESSION_EXPIRED::Session过期'], 'v': '1.0'}", + "Token刷新异常: 网络连接超时", + "初始化时无法获取有效Token" + ] + + for i, example in enumerate(examples_not_filtered, 1): + print(f"{i}. {example}") + + print("\n💡 设计理念:") + print("• 令牌过期是正常现象,系统会自动重试刷新") + print("• Session过期通常意味着Cookie过期,需要用户手动更新") + print("• 网络异常等其他错误也需要用户关注") + print("• 减少无用通知,提升用户体验") + +def show_implementation_details(): + """显示实现细节""" + print("\n\n🔧 实现细节") + print("=" * 50) + + print("📍 修改位置:") + print("• 文件: XianyuAutoAsync.py") + print("• 方法: send_token_refresh_notification()") + print("• 新增: _is_normal_token_expiry() 过滤方法") + + print("\n🔍 过滤关键词:") + keywords = [ + 'FAIL_SYS_TOKEN_EXOIRED::令牌过期', + 'FAIL_SYS_TOKEN_EXPIRED::令牌过期', + 'FAIL_SYS_TOKEN_EXOIRED', + 'FAIL_SYS_TOKEN_EXPIRED', + '令牌过期' + ] + + for keyword in keywords: + print(f"• {keyword}") + + print("\n⚡ 执行流程:") + print("1. 调用 send_token_refresh_notification()") + print("2. 检查 _is_normal_token_expiry(error_message)") + print("3. 如果是正常令牌过期,记录调试日志并返回") + print("4. 如果不是,继续原有的通知发送流程") + + print("\n📝 日志记录:") + print("• 被过滤的消息会记录调试日志") + print("• 格式: '检测到正常的令牌过期,跳过通知: {error_message}'") + print("• 便于问题排查和功能验证") + +if __name__ == "__main__": + try: + success = test_token_expiry_filter_logic() + show_real_world_examples() + show_implementation_details() + + print("\n" + "=" * 50) + if success: + print("🎊 令牌过期通知过滤功能测试完成!") + print("✅ 用户将不再收到正常令牌过期的通知") + else: + print("❌ 测试失败,需要检查实现") + + except Exception as e: + print(f"❌ 测试过程中发生错误: {e}") + import traceback + traceback.print_exc() diff --git a/test_token_expiry_filter.py b/test_token_expiry_filter.py new file mode 100644 index 0000000..ae6bb17 --- /dev/null +++ b/test_token_expiry_filter.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +""" +测试令牌过期通知过滤功能 +验证正常的令牌过期不会发送通知 +""" + +import asyncio +import time +from unittest.mock import AsyncMock, patch, MagicMock +import sys +import os + +# 添加项目根目录到路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +async def test_token_expiry_filter(): + """测试令牌过期通知过滤""" + print("🧪 测试令牌过期通知过滤功能") + print("=" * 60) + + # 动态导入 + try: + from XianyuAutoAsync import XianyuLive + print("✅ 成功导入 XianyuLive") + except Exception as e: + print(f"❌ 导入失败: {e}") + return + + # 创建测试实例 + test_cookies = "unb=test123; _m_h5_tk=test_token_123456789" + + try: + xianyu = XianyuLive(test_cookies, "test_account") + print("✅ XianyuLive 实例创建成功") + except Exception as e: + print(f"❌ 创建实例失败: {e}") + return + + # Mock外部依赖 + with patch('db_manager.db_manager') as mock_db: + # 配置数据库mock + mock_db.get_account_notifications.return_value = [ + { + 'enabled': True, + 'channel_type': 'qq', + 'channel_name': 'Test QQ', + 'channel_config': {'qq_number': '123456', 'api_url': 'http://test.com'} + } + ] + + # Mock QQ通知发送方法 + xianyu._send_qq_notification = AsyncMock() + + print("\n📋 测试用例设计") + print("-" * 40) + + # 测试用例:应该被过滤的错误消息(不发送通知) + filtered_messages = [ + "Token刷新失败: {'ret': ['FAIL_SYS_TOKEN_EXOIRED::令牌过期']}", + "Token刷新失败: {'ret': ['FAIL_SYS_TOKEN_EXPIRED::令牌过期']}", + "Token刷新异常: FAIL_SYS_TOKEN_EXOIRED", + "Token刷新异常: FAIL_SYS_TOKEN_EXPIRED", + "Token刷新失败: 令牌过期", + ] + + # 测试用例:不应该被过滤的错误消息(需要发送通知) + unfiltered_messages = [ + "Token刷新失败: {'ret': ['FAIL_SYS_SESSION_EXPIRED::Session过期']}", + "Token刷新异常: 网络连接超时", + "Token刷新失败: Cookie无效", + "初始化时无法获取有效Token", + "Token刷新失败: 未知错误" + ] + + print("🚫 应该被过滤的消息(不发送通知):") + for i, msg in enumerate(filtered_messages, 1): + print(f" {i}. {msg}") + + print("\n✅ 不应该被过滤的消息(需要发送通知):") + for i, msg in enumerate(unfiltered_messages, 1): + print(f" {i}. {msg}") + + print("\n" + "=" * 60) + print("🧪 开始测试") + + # 测试1: 验证过滤功能 + print("\n1️⃣ 测试令牌过期消息过滤...") + + filtered_count = 0 + for i, message in enumerate(filtered_messages, 1): + xianyu._send_qq_notification.reset_mock() + await xianyu.send_token_refresh_notification(message, f"test_filtered_{i}") + + if not xianyu._send_qq_notification.called: + print(f" ✅ 消息 {i} 被正确过滤") + filtered_count += 1 + else: + print(f" ❌ 消息 {i} 未被过滤(应该被过滤)") + + print(f"\n 📊 过滤结果: {filtered_count}/{len(filtered_messages)} 条消息被正确过滤") + + # 测试2: 验证非过滤消息正常发送 + print("\n2️⃣ 测试非令牌过期消息正常发送...") + + sent_count = 0 + for i, message in enumerate(unfiltered_messages, 1): + xianyu._send_qq_notification.reset_mock() + await xianyu.send_token_refresh_notification(message, f"test_unfiltered_{i}") + + if xianyu._send_qq_notification.called: + print(f" ✅ 消息 {i} 正常发送") + sent_count += 1 + else: + print(f" ❌ 消息 {i} 未发送(应该发送)") + + print(f"\n 📊 发送结果: {sent_count}/{len(unfiltered_messages)} 条消息正常发送") + + # 测试3: 验证过滤逻辑 + print("\n3️⃣ 测试过滤逻辑详情...") + + test_cases = [ + ("FAIL_SYS_TOKEN_EXOIRED::令牌过期", True), + ("FAIL_SYS_TOKEN_EXPIRED::令牌过期", True), + ("FAIL_SYS_TOKEN_EXOIRED", True), + ("FAIL_SYS_TOKEN_EXPIRED", True), + ("令牌过期", True), + ("FAIL_SYS_SESSION_EXPIRED::Session过期", False), + ("网络连接超时", False), + ("Cookie无效", False), + ] + + for message, should_be_filtered in test_cases: + is_filtered = xianyu._is_normal_token_expiry(message) + if is_filtered == should_be_filtered: + status = "✅ 正确" + else: + status = "❌ 错误" + + filter_status = "过滤" if is_filtered else "不过滤" + expected_status = "过滤" if should_be_filtered else "不过滤" + print(f" {status} '{message}' -> {filter_status} (期望: {expected_status})") + + # 总结 + print("\n" + "=" * 60) + print("📊 测试总结") + + total_filtered = len([msg for msg in filtered_messages if xianyu._is_normal_token_expiry(msg)]) + total_unfiltered = len([msg for msg in unfiltered_messages if not xianyu._is_normal_token_expiry(msg)]) + + print(f"✅ 令牌过期消息过滤: {total_filtered}/{len(filtered_messages)} 正确") + print(f"✅ 非令牌过期消息: {total_unfiltered}/{len(unfiltered_messages)} 正确") + + if total_filtered == len(filtered_messages) and total_unfiltered == len(unfiltered_messages): + print("🎉 所有测试通过!令牌过期通知过滤功能正常工作") + else: + print("⚠️ 部分测试失败,需要检查过滤逻辑") + +def show_filter_explanation(): + """显示过滤机制说明""" + print("\n\n📋 令牌过期通知过滤机制说明") + print("=" * 60) + + print("🎯 设计目标:") + print(" • 避免正常的令牌过期发送通知") + print(" • 令牌过期是正常现象,系统会自动重试") + print(" • 只有真正的异常才需要通知用户") + + print("\n🔍 过滤规则:") + print(" 以下关键词的错误消息将被过滤(不发送通知):") + print(" • FAIL_SYS_TOKEN_EXOIRED::令牌过期") + print(" • FAIL_SYS_TOKEN_EXPIRED::令牌过期") + print(" • FAIL_SYS_TOKEN_EXOIRED") + print(" • FAIL_SYS_TOKEN_EXPIRED") + print(" • 令牌过期") + + print("\n✅ 仍会发送通知的情况:") + print(" • FAIL_SYS_SESSION_EXPIRED::Session过期 (Cookie过期)") + print(" • 网络连接异常") + print(" • API调用失败") + print(" • 其他未知错误") + + print("\n💡 优势:") + print(" • 减少无用通知,避免用户困扰") + print(" • 保留重要异常通知,便于及时处理") + print(" • 提升用户体验,通知更有价值") + + print("\n🔧 实现方式:") + print(" • 在发送通知前检查错误消息") + print(" • 使用关键词匹配识别正常的令牌过期") + print(" • 记录调试日志,便于问题排查") + +if __name__ == "__main__": + try: + asyncio.run(test_token_expiry_filter()) + show_filter_explanation() + + print("\n" + "=" * 60) + print("🎊 令牌过期通知过滤测试完成!") + + except Exception as e: + print(f"❌ 测试过程中发生错误: {e}") + import traceback + traceback.print_exc()