优化发货逻辑

This commit is contained in:
zhinianboke 2025-08-02 21:45:50 +08:00
parent 789870b334
commit 8dee429a49
2 changed files with 327 additions and 323 deletions

View File

@ -101,11 +101,11 @@ class XianyuLive:
# 自动发货防重复机制
self.last_delivery_time = {} # 记录每个商品的最后发货时间
self.delivery_cooldown = 60 # 1分钟内不重复发货
self.delivery_cooldown = 600 # 10分钟内不重复发货
# 自动确认发货防重复机制
self.confirmed_orders = {} # 记录已确认发货的订单,防止重复确认
self.order_confirm_cooldown = 300 # 5分钟内不重复确认同一订单
self.order_confirm_cooldown = 600 # 10分钟内不重复确认同一订单
self.session = None # 用于API调用的aiohttp session
@ -144,6 +144,138 @@ class XianyuLive:
else:
logger.debug(f"{self.cookie_id}】无订单ID跳过发货标记")
def _is_auto_delivery_trigger(self, message: str) -> bool:
"""检查消息是否为自动发货触发关键字"""
# 定义所有自动发货触发关键字
auto_delivery_keywords = [
# 系统消息
'[我已付款,等待你发货]',
'[已付款,待发货]',
'我已付款,等待你发货',
'[记得及时发货]',
]
# 检查消息是否包含任何触发关键字
for keyword in auto_delivery_keywords:
if keyword in message:
return True
return False
def _extract_order_id(self, message: dict) -> str:
"""从消息中提取订单ID"""
try:
order_id = None
# 先查看消息的完整结构
logger.debug(f"{self.cookie_id}】🔍 完整消息结构: {message}")
# 检查message['1']的结构
message_1 = message.get('1', {})
logger.debug(f"{self.cookie_id}】🔍 message['1'] keys: {list(message_1.keys()) if message_1 else 'None'}")
# 检查message['1']['6']的结构
message_1_6 = message_1.get('6', {}) if message_1 else {}
logger.debug(f"{self.cookie_id}】🔍 message['1']['6'] keys: {list(message_1_6.keys()) if message_1_6 else 'None'}")
# 方法1: 从button的targetUrl中提取orderId
content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '')
if content_json_str:
try:
content_data = json.loads(content_json_str)
# 方法1a: 从button的targetUrl中提取orderId
target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if target_url:
# 从URL中提取orderId参数
order_match = re.search(r'orderId=(\d+)', target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'{self.cookie_id}】✅ 从button提取到订单ID: {order_id}')
# 方法1b: 从main的targetUrl中提取order_detail的id
if not order_id:
main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '')
if main_target_url:
order_match = re.search(r'order_detail\?id=(\d+)', main_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'{self.cookie_id}】✅ 从main targetUrl提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析内容JSON失败: {parse_e}")
# 方法2: 从dynamicOperation中的order_detail URL提取orderId
if not order_id and content_json_str:
try:
content_data = json.loads(content_json_str)
dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if dynamic_target_url:
# 从order_detail URL中提取id参数
order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'{self.cookie_id}】✅ 从order_detail提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析dynamicOperation JSON失败: {parse_e}")
return order_id
except Exception as e:
logger.error(f"提取订单ID失败: {self._safe_str(e)}")
return None
async def _handle_auto_delivery(self, websocket, message: dict, send_user_name: str, send_user_id: str,
item_id: str, chat_id: str, msg_time: str):
"""统一处理自动发货逻辑"""
try:
# 提取订单ID
order_id = self._extract_order_id(message)
# 订单ID已提取将在自动发货时进行确认发货处理
if order_id:
logger.info(f'[{msg_time}] 【{self.cookie_id}】提取到订单ID: {order_id},将在自动发货时处理确认发货')
else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID')
# 检查是否可以进行自动发货(防重复)- 基于订单ID
if not self.can_auto_delivery(order_id):
return
# 构造用户URL
user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
# 自动发货逻辑
try:
# 设置默认标题将通过API获取真实商品信息
item_title = "待获取商品信息"
logger.info(f"{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}")
# 调用自动发货方法(包含自动确认发货)
delivery_content = await self._auto_delivery(item_id, item_title, order_id)
if delivery_content:
# 标记已发货(防重复)- 基于订单ID
self.mark_delivery_sent(order_id)
# 发送发货内容给买家
await self.send_msg(websocket, chat_id, send_user_id, delivery_content)
logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容')
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
else:
logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败')
# 发送自动发货失败通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败")
except Exception as e:
logger.error(f"自动发货处理异常: {self._safe_str(e)}")
# 发送自动发货异常通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}")
except Exception as e:
logger.error(f"统一自动发货处理异常: {self._safe_str(e)}")
async def refresh_token(self):
@ -1634,6 +1766,8 @@ class XianyuLive:
logger.warning(f"⚠️ 自动确认发货失败: {confirm_result.get('error', '未知错误')}")
# 即使确认发货失败,也继续发送发货内容
# 检查是否存在订单ID只有存在订单ID才处理发货内容
if order_id:
# 开始处理发货内容
logger.info(f"开始处理发货内容,规则: {rule['keyword']} -> {rule['card_name']} ({rule['card_type']})")
@ -1663,6 +1797,10 @@ class XianyuLive:
else:
logger.warning(f"获取发货内容失败: 规则ID={rule['id']}")
return None
else:
# 没有订单ID记录日志但不处理发货内容
logger.info(f"⚠️ 未检测到订单ID跳过发货内容处理。规则: {rule['keyword']} -> {rule['card_name']} ({rule['card_type']})")
return None
except Exception as e:
logger.error(f"自动发货失败: {self._safe_str(e)}")
@ -2406,208 +2544,13 @@ class XianyuLive:
elif send_message == '[你已发货]':
logger.info(f'[{msg_time}] 【{self.cookie_id}】发货确认消息不处理')
return
elif send_message == '[我已付款,等待你发货]':
logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】买家已付款,准备自动发货')
# 提取orderId并打印
try:
order_id = None
# 先查看消息的完整结构
logger.info(f"{self.cookie_id}】🔍 完整消息结构: {message}")
# 检查message['1']的结构
message_1 = message.get('1', {})
logger.info(f"{self.cookie_id}】🔍 message['1'] keys: {list(message_1.keys()) if message_1 else 'None'}")
# 检查message['1']['6']的结构
message_1_6 = message_1.get('6', {}) if message_1 else {}
logger.info(f"{self.cookie_id}】🔍 message['1']['6'] keys: {list(message_1_6.keys()) if message_1_6 else 'None'}")
# 方法1: 从button的targetUrl中提取orderId
content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '')
logger.info(f"{self.cookie_id}】🔍 content_json_str: {content_json_str[:200] if content_json_str else 'None'}...")
if content_json_str:
try:
content_data = json.loads(content_json_str)
logger.info(f"{self.cookie_id}】🔍 content_data keys: {list(content_data.keys())}")
# 方法1a: 从button的targetUrl中提取orderId
target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
logger.info(f"{self.cookie_id}】🔍 button targetUrl: {target_url}")
if target_url:
# 从URL中提取orderId参数
order_match = re.search(r'orderId=(\d+)', target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从button提取到订单ID: {order_id}')
# 方法1b: 从main的targetUrl中提取order_detail的id
if not order_id:
main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '')
logger.info(f"{self.cookie_id}】🔍 main targetUrl: {main_target_url}")
if main_target_url:
order_match = re.search(r'order_detail\?id=(\d+)', main_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从main targetUrl提取到订单ID: {order_id}')
except Exception as parse_e:
logger.error(f"解析内容JSON失败: {parse_e}")
# 方法2: 从dynamicOperation中的order_detail URL提取orderId
if not order_id and content_json_str:
try:
content_data = json.loads(content_json_str)
dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if dynamic_target_url:
# 从order_detail URL中提取id参数
order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从order_detail提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析dynamicOperation JSON失败: {parse_e}")
# 订单ID已提取将在自动发货时进行确认发货处理
if order_id:
logger.info(f'[{msg_time}] 【{self.cookie_id}】提取到订单ID: {order_id},将在自动发货时处理确认发货')
else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID')
except Exception as extract_e:
logger.error(f"提取订单ID失败: {self._safe_str(extract_e)}")
# 检查是否可以进行自动发货(防重复)- 基于订单ID
if not self.can_auto_delivery(order_id):
# 检查是否为自动发货触发消息
elif self._is_auto_delivery_trigger(send_message):
# 使用统一的自动发货处理方法
await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id,
item_id, chat_id, msg_time)
return
# 构造用户URL
user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
# 自动发货逻辑
try:
# 设置默认标题将通过API获取真实商品信息
item_title = "待获取商品信息"
logger.info(f"{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}")
# 调用自动发货方法(包含自动确认发货)
delivery_content = await self._auto_delivery(item_id, item_title, order_id)
if delivery_content:
# 标记已发货(防重复)- 基于订单ID
self.mark_delivery_sent(order_id)
# 发送发货内容给买家
await self.send_msg(websocket, chat_id, send_user_id, delivery_content)
logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容')
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
else:
logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败')
# 发送自动发货失败通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败")
except Exception as e:
logger.error(f"自动发货处理异常: {self._safe_str(e)}")
# 发送自动发货异常通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}")
return
elif send_message == '[已付款,待发货]':
logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】买家已付款,准备自动发货')
# 提取orderId并打印
try:
order_id = None
# 方法1: 从button的targetUrl中提取orderId
content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '')
if content_json_str:
try:
content_data = json.loads(content_json_str)
# 方法1a: 从button的targetUrl中提取orderId
target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if target_url:
# 从URL中提取orderId参数
order_match = re.search(r'orderId=(\d+)', target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从button提取到订单ID: {order_id}')
# 方法1b: 从main的targetUrl中提取order_detail的id
if not order_id:
main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '')
if main_target_url:
order_match = re.search(r'order_detail\?id=(\d+)', main_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从main targetUrl提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析内容JSON失败: {parse_e}")
# 方法2: 从dynamicOperation中的order_detail URL提取orderId
if not order_id and content_json_str:
try:
content_data = json.loads(content_json_str)
dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if dynamic_target_url:
# 从order_detail URL中提取id参数
order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 从order_detail提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析dynamicOperation JSON失败: {parse_e}")
# 订单ID已提取将在自动发货时进行确认发货处理
if order_id:
logger.info(f'[{msg_time}] 【{self.cookie_id}】提取到订单ID: {order_id},将在自动发货时处理确认发货')
else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID')
except Exception as extract_e:
logger.error(f"提取订单ID失败: {self._safe_str(extract_e)}")
# 检查是否可以进行自动发货(防重复)- 基于订单ID
if not self.can_auto_delivery(order_id):
return
# 构造用户URL
user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
# 自动发货逻辑
try:
# 设置默认标题将通过API获取真实商品信息
item_title = "待获取商品信息"
logger.info(f"{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}")
# 调用自动发货方法(包含自动确认发货)
delivery_content = await self._auto_delivery(item_id, item_title, order_id)
if delivery_content:
# 标记已发货(防重复)- 基于订单ID
self.mark_delivery_sent(order_id)
# 发送发货内容给买家
await self.send_msg(websocket, chat_id, send_user_id, delivery_content)
logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容')
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
else:
logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败')
# 发送自动发货失败通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败")
except Exception as e:
logger.error(f"自动发货处理异常: {self._safe_str(e)}")
# 发送自动发货异常通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}")
return
elif send_message == '[卡片消息]':
# 检查是否为"我已小刀,待刀成"的卡片消息
try:
@ -2634,96 +2577,9 @@ class XianyuLive:
# 检查是否为"我已小刀,待刀成"
if card_title == "我已小刀,待刀成":
logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成",准备自动发货')
# 提取orderId并打印
try:
order_id = None
# 方法1: 从button的targetUrl中提取orderId
content_json_str = message.get('1', {}).get('6', {}).get('3', {}).get('5', '')
if content_json_str:
try:
content_data = json.loads(content_json_str)
# 方法1a: 从button的targetUrl中提取orderId
target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if target_url:
# 从URL中提取orderId参数
order_match = re.search(r'orderId=(\d+)', target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 小刀成功从button提取到订单ID: {order_id}')
# 方法1b: 从main的targetUrl中提取order_detail的id
if not order_id:
main_target_url = content_data.get('dxCard', {}).get('item', {}).get('main', {}).get('targetUrl', '')
if main_target_url:
order_match = re.search(r'order_detail\?id=(\d+)', main_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 小刀成功从main targetUrl提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析内容JSON失败: {parse_e}")
# 方法2: 从dynamicOperation中的order_detail URL提取orderId
if not order_id and content_json_str:
try:
content_data = json.loads(content_json_str)
dynamic_target_url = content_data.get('dynamicOperation', {}).get('changeContent', {}).get('dxCard', {}).get('item', {}).get('main', {}).get('exContent', {}).get('button', {}).get('targetUrl', '')
if dynamic_target_url:
# 从order_detail URL中提取id参数
order_match = re.search(r'order_detail\?id=(\d+)', dynamic_target_url)
if order_match:
order_id = order_match.group(1)
logger.info(f'[{msg_time}] 【{self.cookie_id}】✅ 小刀成功从order_detail提取到订单ID: {order_id}')
except Exception as parse_e:
logger.debug(f"解析dynamicOperation JSON失败: {parse_e}")
# 订单ID已提取将在自动发货时进行确认发货处理
if order_id:
logger.info(f'[{msg_time}] 【{self.cookie_id}】小刀成功提取到订单ID: {order_id},将在自动发货时处理确认发货')
else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 小刀成功但未能提取到订单ID')
except Exception as extract_e:
logger.error(f"提取订单ID失败: {self._safe_str(extract_e)}")
# 检查是否可以进行自动发货(防重复)- 基于订单ID
if not self.can_auto_delivery(order_id):
return
# 构造用户URL
user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
# 自动发货逻辑
try:
# 设置默认标题将通过API获取真实商品信息
item_title = "待获取商品信息"
logger.info(f"{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}")
# 调用自动发货方法(包含自动确认发货)
delivery_content = await self._auto_delivery(item_id, item_title, order_id)
if delivery_content:
# 标记已发货(防重复)- 基于订单ID
self.mark_delivery_sent(order_id)
# 发送发货内容给买家
await self.send_msg(websocket, chat_id, send_user_id, delivery_content)
logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容')
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
else:
logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败')
# 发送自动发货失败通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败")
except Exception as e:
logger.error(f"自动发货处理异常: {self._safe_str(e)}")
# 发送自动发货异常通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}")
# 使用统一的自动发货处理方法
await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id,
item_id, chat_id, msg_time)
return
else:
logger.info(f'[{msg_time}] 【{self.cookie_id}】收到卡片消息,标题: {card_title or "未知"}')

View File

@ -378,6 +378,8 @@ class DBManager:
self.set_system_setting("db_version", "1.2", "数据库版本号")
logger.info("数据库升级到版本1.2完成")
# 迁移遗留数据(在所有版本升级完成后执行)
self.migrate_legacy_data(cursor)
except Exception as e:
logger.error(f"数据库版本检查或升级失败: {e}")
@ -504,6 +506,9 @@ class DBManager:
cursor.execute("SELECT COUNT(*) FROM notification_channels")
count = cursor.fetchone()[0]
# 删除可能存在的临时表
cursor.execute("DROP TABLE IF EXISTS notification_channels_new")
# 创建临时表
cursor.execute('''
CREATE TABLE notification_channels_new (
@ -518,10 +523,53 @@ class DBManager:
)
''')
# 复制数据
# 复制数据,并转换不兼容的类型
if count > 0:
logger.info(f"复制 {count} 条通知渠道数据到新表")
cursor.execute("INSERT INTO notification_channels_new SELECT * FROM notification_channels")
# 先查看现有数据的类型
cursor.execute("SELECT DISTINCT type FROM notification_channels")
existing_types = [row[0] for row in cursor.fetchall()]
logger.info(f"现有通知渠道类型: {existing_types}")
# 获取所有现有数据进行逐行处理
cursor.execute("SELECT * FROM notification_channels")
existing_data = cursor.fetchall()
# 逐行转移数据,确保类型映射正确
for row in existing_data:
old_type = row[3] if len(row) > 3 else 'qq' # type字段默认为qq
# 类型映射规则
type_mapping = {
'dingtalk': 'ding_talk',
'ding_talk': 'ding_talk',
'qq': 'qq',
'email': 'qq', # 暂时映射为qq后续版本会支持
'webhook': 'qq', # 暂时映射为qq后续版本会支持
'wechat': 'qq', # 暂时映射为qq后续版本会支持
'telegram': 'qq' # 暂时映射为qq后续版本会支持
}
new_type = type_mapping.get(old_type, 'qq') # 默认转换为qq类型
if old_type != new_type:
logger.info(f"转换通知渠道类型: {old_type} -> {new_type}")
# 插入到新表
cursor.execute('''
INSERT INTO notification_channels_new
(id, name, user_id, type, config, enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
row[0], # id
row[1], # name
row[2], # user_id
new_type, # type (转换后的)
row[4] if len(row) > 4 else '{}', # config
row[5] if len(row) > 5 else True, # enabled
row[6] if len(row) > 6 else None, # created_at
row[7] if len(row) > 7 else None # updated_at
))
# 删除旧表
cursor.execute("DROP TABLE notification_channels")
@ -575,17 +623,40 @@ class DBManager:
if existing_data:
logger.info(f"迁移 {len(existing_data)} 条通知渠道数据到新表")
for row in existing_data:
# 处理类型映射ding_talk -> dingtalk
channel_type = row[3] # type字段
if channel_type == 'ding_talk':
channel_type = 'dingtalk'
# 处理类型映射,支持更多渠道类型
old_type = row[3] if len(row) > 3 else 'qq' # type字段
# 插入到新表
# 扩展的类型映射规则
type_mapping = {
'ding_talk': 'dingtalk', # 统一为dingtalk
'dingtalk': 'dingtalk',
'qq': 'qq',
'email': 'email', # 现在支持email
'webhook': 'webhook', # 现在支持webhook
'wechat': 'wechat', # 现在支持wechat
'telegram': 'telegram' # 现在支持telegram
}
new_type = type_mapping.get(old_type, 'qq') # 默认为qq
if old_type != new_type:
logger.info(f"转换通知渠道类型: {old_type} -> {new_type}")
# 插入到新表,确保字段完整性
cursor.execute('''
INSERT INTO notification_channels_new
(id, name, user_id, type, config, enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (row[0], row[1], row[2], channel_type, row[4], row[5], row[6], row[7]))
''', (
row[0], # id
row[1], # name
row[2], # user_id
new_type, # type (转换后的)
row[4] if len(row) > 4 else '{}', # config
row[5] if len(row) > 5 else True, # enabled
row[6] if len(row) > 6 else None, # created_at
row[7] if len(row) > 7 else None # updated_at
))
# 删除旧表
cursor.execute("DROP TABLE notification_channels")
@ -599,6 +670,83 @@ class DBManager:
logger.error(f"升级notification_channels表类型失败: {e}")
raise
def migrate_legacy_data(self, cursor):
"""迁移遗留数据到新表结构"""
try:
logger.info("开始检查和迁移遗留数据...")
# 检查是否有需要迁移的老表
legacy_tables = [
'old_notification_channels',
'legacy_delivery_rules',
'old_keywords',
'backup_cookies'
]
for table_name in legacy_tables:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
if cursor.fetchone():
logger.info(f"发现遗留表: {table_name},开始迁移数据...")
self._migrate_table_data(cursor, table_name)
logger.info("遗留数据迁移完成")
return True
except Exception as e:
logger.error(f"迁移遗留数据失败: {e}")
return False
def _migrate_table_data(self, cursor, table_name: str):
"""迁移指定表的数据"""
try:
if table_name == 'old_notification_channels':
# 迁移通知渠道数据
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
if count > 0:
cursor.execute(f"SELECT * FROM {table_name}")
old_data = cursor.fetchall()
for row in old_data:
# 处理数据格式转换
cursor.execute('''
INSERT OR IGNORE INTO notification_channels
(name, user_id, type, config, enabled)
VALUES (?, ?, ?, ?, ?)
''', (
row[1] if len(row) > 1 else f"迁移渠道_{row[0]}",
row[2] if len(row) > 2 else 1, # 默认admin用户
self._normalize_channel_type(row[3] if len(row) > 3 else 'qq'),
row[4] if len(row) > 4 else '{}',
row[5] if len(row) > 5 else True
))
logger.info(f"成功迁移 {count} 条通知渠道数据")
# 迁移完成后删除老表
cursor.execute(f"DROP TABLE {table_name}")
logger.info(f"已删除遗留表: {table_name}")
except Exception as e:
logger.error(f"迁移表 {table_name} 数据失败: {e}")
def _normalize_channel_type(self, old_type: str) -> str:
"""标准化通知渠道类型"""
type_mapping = {
'ding_talk': 'dingtalk',
'dingtalk': 'dingtalk',
'qq': 'qq',
'email': 'email',
'webhook': 'webhook',
'wechat': 'wechat',
'telegram': 'telegram',
# 处理一些可能的变体
'dingding': 'dingtalk',
'weixin': 'wechat',
'tg': 'telegram'
}
return type_mapping.get(old_type.lower(), 'qq')
def _migrate_keywords_table_constraints(self, cursor):
"""迁移keywords表的约束支持基于商品ID的唯一性校验"""
try: