diff --git a/XianyuAutoAsync.py b/XianyuAutoAsync.py index 0f44fae..3bac499 100644 --- a/XianyuAutoAsync.py +++ b/XianyuAutoAsync.py @@ -101,11 +101,11 @@ class XianyuLive: # 自动发货防重复机制 self.last_delivery_time = {} # 记录每个商品的最后发货时间 - self.delivery_cooldown = 60 # 1分钟内不重复发货 + self.delivery_cooldown = 600 # 10分钟内不重复发货 # 自动确认发货防重复机制 self.confirmed_orders = {} # 记录已确认发货的订单,防止重复确认 - self.order_confirm_cooldown = 300 # 5分钟内不重复确认同一订单 + self.order_confirm_cooldown = 600 # 10分钟内不重复确认同一订单 self.session = None # 用于API调用的aiohttp session @@ -144,6 +144,138 @@ class XianyuLive: else: logger.debug(f"【{self.cookie_id}】无订单ID,跳过发货标记") + def _is_auto_delivery_trigger(self, message: str) -> bool: + """检查消息是否为自动发货触发关键字""" + # 定义所有自动发货触发关键字 + auto_delivery_keywords = [ + # 系统消息 + '[我已付款,等待你发货]', + '[已付款,待发货]', + '我已付款,等待你发货', + '[记得及时发货]', + ] + + # 检查消息是否包含任何触发关键字 + for keyword in auto_delivery_keywords: + if keyword in message: + return True + + return False + + def _extract_order_id(self, message: dict) -> str: + """从消息中提取订单ID""" + try: + order_id = None + + # 先查看消息的完整结构 + logger.debug(f"【{self.cookie_id}】🔍 完整消息结构: {message}") + + # 检查message['1']的结构 + message_1 = message.get('1', {}) + logger.debug(f"【{self.cookie_id}】🔍 message['1'] keys: {list(message_1.keys()) if message_1 else 'None'}") + + # 检查message['1']['6']的结构 + message_1_6 = message_1.get('6', {}) if message_1 else {} + logger.debug(f"【{self.cookie_id}】🔍 message['1']['6'] keys: {list(message_1_6.keys()) if message_1_6 else 'None'}") + + # 方法1: 从button的targetUrl中提取orderId + content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '') + if content_json_str: + try: + content_data = json.loads(content_json_str) + + # 方法1a: 从button的targetUrl中提取orderId + target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') + if target_url: + # 从URL中提取orderId参数 + order_match = re.search(r'orderId=(\d+)', target_url) + if order_match: + order_id = order_match.group(1) + logger.info(f'【{self.cookie_id}】✅ 从button提取到订单ID: {order_id}') + + # 方法1b: 从main的targetUrl中提取order_detail的id + if not order_id: + main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '') + if main_target_url: + order_match = re.search(r'order_detail\?id=(\d+)', main_target_url) + if order_match: + order_id = order_match.group(1) + logger.info(f'【{self.cookie_id}】✅ 从main targetUrl提取到订单ID: {order_id}') + + except Exception as parse_e: + logger.debug(f"解析内容JSON失败: {parse_e}") + + # 方法2: 从dynamicOperation中的order_detail URL提取orderId + if not order_id and content_json_str: + try: + content_data = json.loads(content_json_str) + dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') + if dynamic_target_url: + # 从order_detail URL中提取id参数 + order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url) + if order_match: + order_id = order_match.group(1) + logger.info(f'【{self.cookie_id}】✅ 从order_detail提取到订单ID: {order_id}') + except Exception as parse_e: + logger.debug(f"解析dynamicOperation JSON失败: {parse_e}") + + return order_id + + except Exception as e: + logger.error(f"提取订单ID失败: {self._safe_str(e)}") + return None + + async def _handle_auto_delivery(self, websocket, message: dict, send_user_name: str, send_user_id: str, + item_id: str, chat_id: str, msg_time: str): + """统一处理自动发货逻辑""" + try: + # 提取订单ID + order_id = self._extract_order_id(message) + + # 订单ID已提取,将在自动发货时进行确认发货处理 + if order_id: + logger.info(f'[{msg_time}] 【{self.cookie_id}】提取到订单ID: {order_id},将在自动发货时处理确认发货') + else: + logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID') + + # 检查是否可以进行自动发货(防重复)- 基于订单ID + if not self.can_auto_delivery(order_id): + return + + # 构造用户URL + user_url = f'https://www.goofish.com/personal?userId={send_user_id}' + + # 自动发货逻辑 + try: + # 设置默认标题(将通过API获取真实商品信息) + item_title = "待获取商品信息" + + logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") + + # 调用自动发货方法(包含自动确认发货) + delivery_content = await self._auto_delivery(item_id, item_title, order_id) + + if delivery_content: + # 标记已发货(防重复)- 基于订单ID + self.mark_delivery_sent(order_id) + + # 发送发货内容给买家 + await self.send_msg(websocket, chat_id, send_user_id, delivery_content) + logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") + else: + logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') + # 发送自动发货失败通知 + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") + + except Exception as e: + logger.error(f"自动发货处理异常: {self._safe_str(e)}") + # 发送自动发货异常通知 + await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") + + except Exception as e: + logger.error(f"统一自动发货处理异常: {self._safe_str(e)}") + async def refresh_token(self): @@ -1634,34 +1766,40 @@ class XianyuLive: logger.warning(f"⚠️ 自动确认发货失败: {confirm_result.get('error', '未知错误')}") # 即使确认发货失败,也继续发送发货内容 - # 开始处理发货内容 - logger.info(f"开始处理发货内容,规则: {rule['keyword']} -> {rule['card_name']} ({rule['card_type']})") + # 检查是否存在订单ID,只有存在订单ID才处理发货内容 + if order_id: + # 开始处理发货内容 + logger.info(f"开始处理发货内容,规则: {rule['keyword']} -> {rule['card_name']} ({rule['card_type']})") - delivery_content = None + delivery_content = None - # 根据卡券类型处理发货内容 - if rule['card_type'] == 'api': - # API类型:调用API获取内容 - delivery_content = await self._get_api_card_content(rule) + # 根据卡券类型处理发货内容 + if rule['card_type'] == 'api': + # API类型:调用API获取内容 + delivery_content = await self._get_api_card_content(rule) - elif rule['card_type'] == 'text': - # 固定文字类型:直接使用文字内容 - delivery_content = rule['text_content'] + elif rule['card_type'] == 'text': + # 固定文字类型:直接使用文字内容 + delivery_content = rule['text_content'] - elif rule['card_type'] == 'data': - # 批量数据类型:获取并消费第一条数据 - delivery_content = db_manager.consume_batch_data(rule['card_id']) + elif rule['card_type'] == 'data': + # 批量数据类型:获取并消费第一条数据 + delivery_content = db_manager.consume_batch_data(rule['card_id']) - if delivery_content: - # 处理备注信息和变量替换 - final_content = self._process_delivery_content_with_description(delivery_content, rule.get('card_description', '')) + if delivery_content: + # 处理备注信息和变量替换 + final_content = self._process_delivery_content_with_description(delivery_content, rule.get('card_description', '')) - # 增加发货次数统计 - db_manager.increment_delivery_times(rule['id']) - logger.info(f"自动发货成功: 规则ID={rule['id']}, 内容长度={len(final_content)}") - return final_content + # 增加发货次数统计 + db_manager.increment_delivery_times(rule['id']) + logger.info(f"自动发货成功: 规则ID={rule['id']}, 内容长度={len(final_content)}") + return final_content + else: + logger.warning(f"获取发货内容失败: 规则ID={rule['id']}") + return None else: - logger.warning(f"获取发货内容失败: 规则ID={rule['id']}") + # 没有订单ID,记录日志但不处理发货内容 + logger.info(f"⚠️ 未检测到订单ID,跳过发货内容处理。规则: {rule['keyword']} -> {rule['card_name']} ({rule['card_type']})") return None except Exception as e: @@ -2406,208 +2544,13 @@ class XianyuLive: elif send_message == '[你已发货]': logger.info(f'[{msg_time}] 【{self.cookie_id}】发货确认消息不处理') return - elif send_message == '[我已付款,等待你发货]': - logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】买家已付款,准备自动发货') - - # 提取orderId并打印 - try: - order_id = None - - # 先查看消息的完整结构 - logger.info(f"【{self.cookie_id}】🔍 完整消息结构: {message}") - - # 检查message['1']的结构 - message_1 = message.get('1', {}) - logger.info(f"【{self.cookie_id}】🔍 message['1'] keys: {list(message_1.keys()) if message_1 else 'None'}") - - # 检查message['1']['6']的结构 - message_1_6 = message_1.get('6', {}) if message_1 else {} - logger.info(f"【{self.cookie_id}】🔍 message['1']['6'] keys: {list(message_1_6.keys()) if message_1_6 else 'None'}") - - # 方法1: 从button的targetUrl中提取orderId - content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '') - logger.info(f"【{self.cookie_id}】🔍 content_json_str: {content_json_str[:200] if content_json_str else 'None'}...") - - if content_json_str: - try: - content_data = json.loads(content_json_str) - logger.info(f"【{self.cookie_id}】🔍 content_data keys: {list(content_data.keys())}") - - # 方法1a: 从button的targetUrl中提取orderId - target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') - logger.info(f"【{self.cookie_id}】🔍 button targetUrl: {target_url}") - if target_url: - # 从URL中提取orderId参数 - order_match = re.search(r'orderId=(\d+)', target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从button提取到订单ID: {order_id}') - - # 方法1b: 从main的targetUrl中提取order_detail的id - if not order_id: - main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '') - logger.info(f"【{self.cookie_id}】🔍 main targetUrl: {main_target_url}") - if main_target_url: - order_match = re.search(r'order_detail\?id=(\d+)', main_target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从main targetUrl提取到订单ID: {order_id}') - - except Exception as parse_e: - logger.error(f"解析内容JSON失败: {parse_e}") - - # 方法2: 从dynamicOperation中的order_detail URL提取orderId - if not order_id and content_json_str: - try: - content_data = json.loads(content_json_str) - dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') - if dynamic_target_url: - # 从order_detail URL中提取id参数 - order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从order_detail提取到订单ID: {order_id}') - except Exception as parse_e: - logger.debug(f"解析dynamicOperation JSON失败: {parse_e}") - - # 订单ID已提取,将在自动发货时进行确认发货处理 - if order_id: - logger.info(f'[{msg_time}] 【{self.cookie_id}】提取到订单ID: {order_id},将在自动发货时处理确认发货') - else: - logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID') - - except Exception as extract_e: - logger.error(f"提取订单ID失败: {self._safe_str(extract_e)}") - - # 检查是否可以进行自动发货(防重复)- 基于订单ID - if not self.can_auto_delivery(order_id): - return - - # 构造用户URL - user_url = f'https://www.goofish.com/personal?userId={send_user_id}' - - # 自动发货逻辑 - try: - # 设置默认标题(将通过API获取真实商品信息) - item_title = "待获取商品信息" - - logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") - - # 调用自动发货方法(包含自动确认发货) - delivery_content = await self._auto_delivery(item_id, item_title, order_id) - - if delivery_content: - # 标记已发货(防重复)- 基于订单ID - self.mark_delivery_sent(order_id) - - # 发送发货内容给买家 - await self.send_msg(websocket, chat_id, send_user_id, delivery_content) - logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") - else: - logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') - # 发送自动发货失败通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") - - except Exception as e: - logger.error(f"自动发货处理异常: {self._safe_str(e)}") - # 发送自动发货异常通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") - + # 检查是否为自动发货触发消息 + elif self._is_auto_delivery_trigger(send_message): + # 使用统一的自动发货处理方法 + await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id, + item_id, chat_id, msg_time) return - elif send_message == '[已付款,待发货]': - logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】买家已付款,准备自动发货') - # 提取orderId并打印 - try: - order_id = None - - # 方法1: 从button的targetUrl中提取orderId - content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '') - if content_json_str: - try: - content_data = json.loads(content_json_str) - - # 方法1a: 从button的targetUrl中提取orderId - target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') - if target_url: - # 从URL中提取orderId参数 - order_match = re.search(r'orderId=(\d+)', target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从button提取到订单ID: {order_id}') - - # 方法1b: 从main的targetUrl中提取order_detail的id - if not order_id: - main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '') - if main_target_url: - order_match = re.search(r'order_detail\?id=(\d+)', main_target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从main targetUrl提取到订单ID: {order_id}') - - except Exception as parse_e: - logger.debug(f"解析内容JSON失败: {parse_e}") - - # 方法2: 从dynamicOperation中的order_detail URL提取orderId - if not order_id and content_json_str: - try: - content_data = json.loads(content_json_str) - dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') - if dynamic_target_url: - # 从order_detail URL中提取id参数 - order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从order_detail提取到订单ID: {order_id}') - except Exception as parse_e: - logger.debug(f"解析dynamicOperation JSON失败: {parse_e}") - - # 订单ID已提取,将在自动发货时进行确认发货处理 - if order_id: - logger.info(f'[{msg_time}] 【{self.cookie_id}】提取到订单ID: {order_id},将在自动发货时处理确认发货') - else: - logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID') - - except Exception as extract_e: - logger.error(f"提取订单ID失败: {self._safe_str(extract_e)}") - - # 检查是否可以进行自动发货(防重复)- 基于订单ID - if not self.can_auto_delivery(order_id): - return - - # 构造用户URL - user_url = f'https://www.goofish.com/personal?userId={send_user_id}' - - # 自动发货逻辑 - try: - # 设置默认标题(将通过API获取真实商品信息) - item_title = "待获取商品信息" - - logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") - - # 调用自动发货方法(包含自动确认发货) - delivery_content = await self._auto_delivery(item_id, item_title, order_id) - - if delivery_content: - # 标记已发货(防重复)- 基于订单ID - self.mark_delivery_sent(order_id) - - # 发送发货内容给买家 - await self.send_msg(websocket, chat_id, send_user_id, delivery_content) - logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") - else: - logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') - # 发送自动发货失败通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") - - except Exception as e: - logger.error(f"自动发货处理异常: {self._safe_str(e)}") - # 发送自动发货异常通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") - - return elif send_message == '[卡片消息]': # 检查是否为"我已小刀,待刀成"的卡片消息 try: @@ -2634,96 +2577,9 @@ class XianyuLive: # 检查是否为"我已小刀,待刀成" if card_title == "我已小刀,待刀成": logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成",准备自动发货') - - # 提取orderId并打印 - try: - order_id = None - - # 方法1: 从button的targetUrl中提取orderId - content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '') - if content_json_str: - try: - content_data = json.loads(content_json_str) - - # 方法1a: 从button的targetUrl中提取orderId - target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') - if target_url: - # 从URL中提取orderId参数 - order_match = re.search(r'orderId=(\d+)', target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 小刀成功,从button提取到订单ID: {order_id}') - - # 方法1b: 从main的targetUrl中提取order_detail的id - if not order_id: - main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '') - if main_target_url: - order_match = re.search(r'order_detail\?id=(\d+)', main_target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 小刀成功,从main targetUrl提取到订单ID: {order_id}') - - except Exception as parse_e: - logger.debug(f"解析内容JSON失败: {parse_e}") - - # 方法2: 从dynamicOperation中的order_detail URL提取orderId - if not order_id and content_json_str: - try: - content_data = json.loads(content_json_str) - dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '') - if dynamic_target_url: - # 从order_detail URL中提取id参数 - order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url) - if order_match: - order_id = order_match.group(1) - logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 小刀成功,从order_detail提取到订单ID: {order_id}') - except Exception as parse_e: - logger.debug(f"解析dynamicOperation JSON失败: {parse_e}") - - # 订单ID已提取,将在自动发货时进行确认发货处理 - if order_id: - logger.info(f'[{msg_time}] 【{self.cookie_id}】小刀成功,提取到订单ID: {order_id},将在自动发货时处理确认发货') - else: - logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 小刀成功但未能提取到订单ID') - - except Exception as extract_e: - logger.error(f"提取订单ID失败: {self._safe_str(extract_e)}") - - # 检查是否可以进行自动发货(防重复)- 基于订单ID - if not self.can_auto_delivery(order_id): - return - - # 构造用户URL - user_url = f'https://www.goofish.com/personal?userId={send_user_id}' - - # 自动发货逻辑 - try: - # 设置默认标题(将通过API获取真实商品信息) - item_title = "待获取商品信息" - - logger.info(f"【{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") - - # 调用自动发货方法(包含自动确认发货) - delivery_content = await self._auto_delivery(item_id, item_title, order_id) - - if delivery_content: - # 标记已发货(防重复)- 基于订单ID - self.mark_delivery_sent(order_id) - - # 发送发货内容给买家 - await self.send_msg(websocket, chat_id, send_user_id, delivery_content) - logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") - else: - logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败') - # 发送自动发货失败通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败") - - except Exception as e: - logger.error(f"自动发货处理异常: {self._safe_str(e)}") - # 发送自动发货异常通知 - await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") - + # 使用统一的自动发货处理方法 + await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id, + item_id, chat_id, msg_time) return else: logger.info(f'[{msg_time}] 【{self.cookie_id}】收到卡片消息,标题: {card_title or "未知"}') diff --git a/db_manager.py b/db_manager.py index 941c554..df71596 100644 --- a/db_manager.py +++ b/db_manager.py @@ -378,7 +378,9 @@ class DBManager: self.set_system_setting("db_version", "1.2", "数据库版本号") logger.info("数据库升级到版本1.2完成") - + # 迁移遗留数据(在所有版本升级完成后执行) + self.migrate_legacy_data(cursor) + except Exception as e: logger.error(f"数据库版本检查或升级失败: {e}") raise @@ -503,7 +505,10 @@ class DBManager: # 检查表中是否有数据 cursor.execute("SELECT COUNT(*) FROM notification_channels") count = cursor.fetchone()[0] - + + # 删除可能存在的临时表 + cursor.execute("DROP TABLE IF EXISTS notification_channels_new") + # 创建临时表 cursor.execute(''' CREATE TABLE notification_channels_new ( @@ -518,10 +523,53 @@ class DBManager: ) ''') - # 复制数据 + # 复制数据,并转换不兼容的类型 if count > 0: logger.info(f"复制 {count} 条通知渠道数据到新表") - cursor.execute("INSERT INTO notification_channels_new SELECT * FROM notification_channels") + # 先查看现有数据的类型 + cursor.execute("SELECT DISTINCT type FROM notification_channels") + existing_types = [row[0] for row in cursor.fetchall()] + logger.info(f"现有通知渠道类型: {existing_types}") + + # 获取所有现有数据进行逐行处理 + cursor.execute("SELECT * FROM notification_channels") + existing_data = cursor.fetchall() + + # 逐行转移数据,确保类型映射正确 + for row in existing_data: + old_type = row[3] if len(row) > 3 else 'qq' # type字段,默认为qq + + # 类型映射规则 + type_mapping = { + 'dingtalk': 'ding_talk', + 'ding_talk': 'ding_talk', + 'qq': 'qq', + 'email': 'qq', # 暂时映射为qq,后续版本会支持 + 'webhook': 'qq', # 暂时映射为qq,后续版本会支持 + 'wechat': 'qq', # 暂时映射为qq,后续版本会支持 + 'telegram': 'qq' # 暂时映射为qq,后续版本会支持 + } + + new_type = type_mapping.get(old_type, 'qq') # 默认转换为qq类型 + + if old_type != new_type: + logger.info(f"转换通知渠道类型: {old_type} -> {new_type}") + + # 插入到新表 + cursor.execute(''' + INSERT INTO notification_channels_new + (id, name, user_id, type, config, enabled, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + row[0], # id + row[1], # name + row[2], # user_id + new_type, # type (转换后的) + row[4] if len(row) > 4 else '{}', # config + row[5] if len(row) > 5 else True, # enabled + row[6] if len(row) > 6 else None, # created_at + row[7] if len(row) > 7 else None # updated_at + )) # 删除旧表 cursor.execute("DROP TABLE notification_channels") @@ -575,17 +623,40 @@ class DBManager: if existing_data: logger.info(f"迁移 {len(existing_data)} 条通知渠道数据到新表") for row in existing_data: - # 处理类型映射:ding_talk -> dingtalk - channel_type = row[3] # type字段 - if channel_type == 'ding_talk': - channel_type = 'dingtalk' + # 处理类型映射,支持更多渠道类型 + old_type = row[3] if len(row) > 3 else 'qq' # type字段 - # 插入到新表 + # 扩展的类型映射规则 + type_mapping = { + 'ding_talk': 'dingtalk', # 统一为dingtalk + 'dingtalk': 'dingtalk', + 'qq': 'qq', + 'email': 'email', # 现在支持email + 'webhook': 'webhook', # 现在支持webhook + 'wechat': 'wechat', # 现在支持wechat + 'telegram': 'telegram' # 现在支持telegram + } + + new_type = type_mapping.get(old_type, 'qq') # 默认为qq + + if old_type != new_type: + logger.info(f"转换通知渠道类型: {old_type} -> {new_type}") + + # 插入到新表,确保字段完整性 cursor.execute(''' INSERT INTO notification_channels_new (id, name, user_id, type, config, enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ''', (row[0], row[1], row[2], channel_type, row[4], row[5], row[6], row[7])) + ''', ( + row[0], # id + row[1], # name + row[2], # user_id + new_type, # type (转换后的) + row[4] if len(row) > 4 else '{}', # config + row[5] if len(row) > 5 else True, # enabled + row[6] if len(row) > 6 else None, # created_at + row[7] if len(row) > 7 else None # updated_at + )) # 删除旧表 cursor.execute("DROP TABLE notification_channels") @@ -598,6 +669,83 @@ class DBManager: except Exception as e: logger.error(f"升级notification_channels表类型失败: {e}") raise + + def migrate_legacy_data(self, cursor): + """迁移遗留数据到新表结构""" + try: + logger.info("开始检查和迁移遗留数据...") + + # 检查是否有需要迁移的老表 + legacy_tables = [ + 'old_notification_channels', + 'legacy_delivery_rules', + 'old_keywords', + 'backup_cookies' + ] + + for table_name in legacy_tables: + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) + if cursor.fetchone(): + logger.info(f"发现遗留表: {table_name},开始迁移数据...") + self._migrate_table_data(cursor, table_name) + + logger.info("遗留数据迁移完成") + return True + except Exception as e: + logger.error(f"迁移遗留数据失败: {e}") + return False + + def _migrate_table_data(self, cursor, table_name: str): + """迁移指定表的数据""" + try: + if table_name == 'old_notification_channels': + # 迁移通知渠道数据 + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cursor.fetchone()[0] + + if count > 0: + cursor.execute(f"SELECT * FROM {table_name}") + old_data = cursor.fetchall() + + for row in old_data: + # 处理数据格式转换 + cursor.execute(''' + INSERT OR IGNORE INTO notification_channels + (name, user_id, type, config, enabled) + VALUES (?, ?, ?, ?, ?) + ''', ( + row[1] if len(row) > 1 else f"迁移渠道_{row[0]}", + row[2] if len(row) > 2 else 1, # 默认admin用户 + self._normalize_channel_type(row[3] if len(row) > 3 else 'qq'), + row[4] if len(row) > 4 else '{}', + row[5] if len(row) > 5 else True + )) + + logger.info(f"成功迁移 {count} 条通知渠道数据") + + # 迁移完成后删除老表 + cursor.execute(f"DROP TABLE {table_name}") + logger.info(f"已删除遗留表: {table_name}") + + except Exception as e: + logger.error(f"迁移表 {table_name} 数据失败: {e}") + + def _normalize_channel_type(self, old_type: str) -> str: + """标准化通知渠道类型""" + type_mapping = { + 'ding_talk': 'dingtalk', + 'dingtalk': 'dingtalk', + 'qq': 'qq', + 'email': 'email', + 'webhook': 'webhook', + 'wechat': 'wechat', + 'telegram': 'telegram', + # 处理一些可能的变体 + 'dingding': 'dingtalk', + 'weixin': 'wechat', + 'tg': 'telegram' + } + return type_mapping.get(old_type.lower(), 'qq') def _migrate_keywords_table_constraints(self, cursor): """迁移keywords表的约束,支持基于商品ID的唯一性校验"""