优化自动发货,防止漏单;优化token刷新机制

This commit is contained in:
zhinianboke 2025-08-10 12:48:49 +08:00
parent 73768a0c3f
commit b508c3e858
3 changed files with 504 additions and 353 deletions

View File

@ -12,14 +12,13 @@ from utils.xianyu_utils import (
) )
from config import ( from config import (
WEBSOCKET_URL, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, WEBSOCKET_URL, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT,
TOKEN_REFRESH_INTERVAL, TOKEN_RETRY_INTERVAL, config, COOKIES_STR, TOKEN_REFRESH_INTERVAL, TOKEN_RETRY_INTERVAL, COOKIES_STR,
LOG_CONFIG, AUTO_REPLY, DEFAULT_HEADERS, WEBSOCKET_HEADERS, LOG_CONFIG, AUTO_REPLY, DEFAULT_HEADERS, WEBSOCKET_HEADERS,
APP_CONFIG, API_ENDPOINTS APP_CONFIG, API_ENDPOINTS
) )
from utils.message_utils import format_message, format_system_message
from utils.ws_utils import WebSocketClient
import sys import sys
import aiohttp import aiohttp
from collections import defaultdict
class AutoReplyPauseManager: class AutoReplyPauseManager:
@ -108,6 +107,18 @@ logger.add(
) )
class XianyuLive: class XianyuLive:
# 类级别的锁字典为每个order_id维护一个锁用于自动发货
_order_locks = defaultdict(lambda: asyncio.Lock())
# 记录锁的最后使用时间,用于清理
_lock_usage_times = {}
# 记录锁的持有状态和释放时间 {lock_key: {'locked': bool, 'release_time': float, 'task': asyncio.Task}}
_lock_hold_info = {}
# 独立的锁字典,用于订单详情获取(不使用延迟锁机制)
_order_detail_locks = defaultdict(lambda: asyncio.Lock())
# 记录订单详情锁的使用时间
_order_detail_lock_times = {}
def _safe_str(self, e): def _safe_str(self, e):
"""安全地将异常转换为字符串""" """安全地将异常转换为字符串"""
try: try:
@ -172,6 +183,8 @@ class XianyuLive:
self.confirmed_orders = {} # 记录已确认发货的订单,防止重复确认 self.confirmed_orders = {} # 记录已确认发货的订单,防止重复确认
self.order_confirm_cooldown = 600 # 10分钟内不重复确认同一订单 self.order_confirm_cooldown = 600 # 10分钟内不重复确认同一订单
# 自动发货已发送订单记录
self.delivery_sent_orders = set() # 记录已发货的订单ID防止重复发货
self.session = None # 用于API调用的aiohttp session self.session = None # 用于API调用的aiohttp session
@ -205,12 +218,111 @@ class XianyuLive:
return True return True
def mark_delivery_sent(self, order_id: str): def mark_delivery_sent(self, order_id: str):
"""标记订单已发货 - 基于订单ID""" """标记订单已发货"""
if order_id: self.delivery_sent_orders.add(order_id)
self.last_delivery_time[order_id] = time.time() logger.info(f"{self.cookie_id}】订单 {order_id} 已标记为发货")
logger.debug(f"{self.cookie_id}】标记订单 {order_id} 已发货")
else: async def _delayed_lock_release(self, lock_key: str, delay_minutes: int = 10):
logger.debug(f"{self.cookie_id}】无订单ID跳过发货标记") """
延迟释放锁的异步任务
Args:
lock_key: 锁的键
delay_minutes: 延迟时间分钟默认10分钟
"""
try:
delay_seconds = delay_minutes * 60
logger.info(f"{self.cookie_id}】订单锁 {lock_key} 将在 {delay_minutes} 分钟后释放")
# 等待指定时间
await asyncio.sleep(delay_seconds)
# 检查锁是否仍然存在且需要释放
if lock_key in self._lock_hold_info:
lock_info = self._lock_hold_info[lock_key]
if lock_info.get('locked', False):
# 释放锁
lock_info['locked'] = False
lock_info['release_time'] = time.time()
logger.info(f"{self.cookie_id}】订单锁 {lock_key} 延迟释放完成")
# 清理锁信息(可选,也可以保留用于统计)
# del self._lock_hold_info[lock_key]
except asyncio.CancelledError:
logger.info(f"{self.cookie_id}】订单锁 {lock_key} 延迟释放任务被取消")
except Exception as e:
logger.error(f"{self.cookie_id}】订单锁 {lock_key} 延迟释放失败: {self._safe_str(e)}")
def is_lock_held(self, lock_key: str) -> bool:
"""
检查指定的锁是否仍在持有状态
Args:
lock_key: 锁的键
Returns:
bool: True表示锁仍在持有False表示锁已释放或不存在
"""
if lock_key not in self._lock_hold_info:
return False
lock_info = self._lock_hold_info[lock_key]
return lock_info.get('locked', False)
def cleanup_expired_locks(self, max_age_hours: int = 24):
"""
清理过期的锁包括自动发货锁和订单详情锁
Args:
max_age_hours: 锁的最大保留时间小时默认24小时
"""
try:
current_time = time.time()
max_age_seconds = max_age_hours * 3600
# 清理自动发货锁
expired_delivery_locks = []
for order_id, last_used in self._lock_usage_times.items():
if current_time - last_used > max_age_seconds:
expired_delivery_locks.append(order_id)
# 清理过期的自动发货锁
for order_id in expired_delivery_locks:
if order_id in self._order_locks:
del self._order_locks[order_id]
if order_id in self._lock_usage_times:
del self._lock_usage_times[order_id]
# 清理锁持有信息
if order_id in self._lock_hold_info:
lock_info = self._lock_hold_info[order_id]
# 取消延迟释放任务
if 'task' in lock_info and lock_info['task']:
lock_info['task'].cancel()
del self._lock_hold_info[order_id]
# 清理订单详情锁
expired_detail_locks = []
for order_id, last_used in self._order_detail_lock_times.items():
if current_time - last_used > max_age_seconds:
expired_detail_locks.append(order_id)
# 清理过期的订单详情锁
for order_id in expired_detail_locks:
if order_id in self._order_detail_locks:
del self._order_detail_locks[order_id]
if order_id in self._order_detail_lock_times:
del self._order_detail_lock_times[order_id]
total_expired = len(expired_delivery_locks) + len(expired_detail_locks)
if total_expired > 0:
logger.info(f"{self.cookie_id}】清理了 {total_expired} 个过期锁 (发货锁: {len(expired_delivery_locks)}, 详情锁: {len(expired_detail_locks)})")
logger.debug(f"{self.cookie_id}】当前锁数量 - 发货锁: {len(self._order_locks)}, 详情锁: {len(self._order_detail_locks)}")
except Exception as e:
logger.error(f"{self.cookie_id}】清理过期锁时发生错误: {self._safe_str(e)}")
def _is_auto_delivery_trigger(self, message: str) -> bool: def _is_auto_delivery_trigger(self, message: str) -> bool:
"""检查消息是否为自动发货触发关键字""" """检查消息是否为自动发货触发关键字"""
@ -354,66 +466,109 @@ class XianyuLive:
else: else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID') logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID')
# 检查是否可以进行自动发货(防重复)- 基于订单ID # 使用订单ID作为锁的键如果没有订单ID则使用item_id+chat_id组合
if not self.can_auto_delivery(order_id): lock_key = order_id if order_id else f"{item_id}_{chat_id}"
# 第一重检查:延迟锁状态(在获取锁之前检查,避免不必要的等待)
if self.is_lock_held(lock_key):
logger.info(f'[{msg_time}] 【{self.cookie_id}】🔒【提前检查】订单 {lock_key} 延迟锁仍在持有状态,跳过发货')
return return
# 构造用户URL # 第二重检查:基于时间的冷却机制
user_url = f'https://www.goofish.com/personal?userId={send_user_id}' if not self.can_auto_delivery(order_id):
logger.info(f'[{msg_time}] 【{self.cookie_id}】订单 {order_id} 在冷却期内,跳过发货')
return
# 自动发货逻辑 # 获取或创建该订单的锁
try: order_lock = self._order_locks[lock_key]
# 设置默认标题将通过API获取真实商品信息
item_title = "待获取商品信息"
logger.info(f"{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}") # 更新锁的使用时间
self._lock_usage_times[lock_key] = time.time()
# 调用自动发货方法(包含自动确认发货) # 使用异步锁防止同一订单的并发处理
delivery_content = await self._auto_delivery(item_id, item_title, order_id, send_user_id) async with order_lock:
logger.info(f'[{msg_time}] 【{self.cookie_id}】获取订单锁成功: {lock_key},开始处理自动发货')
if delivery_content: # 第三重检查:获取锁后再次检查延迟锁状态(双重检查,防止在等待锁期间状态发生变化)
# 标记已发货(防重复)- 基于订单ID if self.is_lock_held(lock_key):
self.mark_delivery_sent(order_id) logger.info(f'[{msg_time}] 【{self.cookie_id}】订单 {lock_key} 在获取锁后检查发现延迟锁仍持有,跳过发货')
return
# 检查是否是图片发送标记 # 第四重检查:获取锁后再次检查冷却状态
if delivery_content.startswith("__IMAGE_SEND__"): if not self.can_auto_delivery(order_id):
# 提取卡券ID和图片URL logger.info(f'[{msg_time}] 【{self.cookie_id}】订单 {order_id} 在获取锁后检查发现仍在冷却期,跳过发货')
image_data = delivery_content.replace("__IMAGE_SEND__", "") return
if "|" in image_data:
card_id_str, image_url = image_data.split("|", 1) # 构造用户URL
try: user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
card_id = int(card_id_str)
except ValueError: # 自动发货逻辑
logger.error(f"无效的卡券ID: {card_id_str}") try:
# 设置默认标题将通过API获取真实商品信息
item_title = "待获取商品信息"
logger.info(f"{self.cookie_id}】准备自动发货: item_id={item_id}, item_title={item_title}")
# 调用自动发货方法(包含自动确认发货)
delivery_content = await self._auto_delivery(item_id, item_title, order_id, send_user_id)
if delivery_content:
# 标记已发货(防重复)- 基于订单ID
self.mark_delivery_sent(order_id)
# 标记锁为持有状态,并启动延迟释放任务
self._lock_hold_info[lock_key] = {
'locked': True,
'lock_time': time.time(),
'release_time': None,
'task': None
}
# 启动延迟释放锁的异步任务10分钟后释放
delay_task = asyncio.create_task(self._delayed_lock_release(lock_key, delay_minutes=10))
self._lock_hold_info[lock_key]['task'] = delay_task
# 检查是否是图片发送标记
if delivery_content.startswith("__IMAGE_SEND__"):
# 提取卡券ID和图片URL
image_data = delivery_content.replace("__IMAGE_SEND__", "")
if "|" in image_data:
card_id_str, image_url = image_data.split("|", 1)
try:
card_id = int(card_id_str)
except ValueError:
logger.error(f"无效的卡券ID: {card_id_str}")
card_id = None
else:
# 兼容旧格式没有卡券ID
card_id = None card_id = None
image_url = image_data
# 发送图片消息
try:
await self.send_image_msg(websocket, chat_id, send_user_id, image_url, card_id=card_id)
logger.info(f'[{msg_time}] 【自动发货图片】已向 {user_url} 发送图片: {image_url}')
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
except Exception as e:
logger.error(f"自动发货图片失败: {self._safe_str(e)}")
await self.send_msg(websocket, chat_id, send_user_id, "抱歉,图片发送失败,请联系客服。")
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "图片发送失败")
else: else:
# 兼容旧格式没有卡券ID # 普通文本发货内容
card_id = None await self.send_msg(websocket, chat_id, send_user_id, delivery_content)
image_url = image_data logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容')
# 发送图片消息
try:
await self.send_image_msg(websocket, chat_id, send_user_id, image_url, card_id=card_id)
logger.info(f'[{msg_time}] 【自动发货图片】已向 {user_url} 发送图片: {image_url}')
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功") await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
except Exception as e:
logger.error(f"自动发货图片失败: {self._safe_str(e)}")
await self.send_msg(websocket, chat_id, send_user_id, "抱歉,图片发送失败,请联系客服。")
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "图片发送失败")
else: else:
# 普通文本发货内容 logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败')
await self.send_msg(websocket, chat_id, send_user_id, delivery_content) # 发送自动发货失败通知
logger.info(f'[{msg_time}] 【自动发货】已向 {user_url} 发送发货内容') await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败")
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "发货成功")
else:
logger.warning(f'[{msg_time}] 【自动发货】未找到匹配的发货规则或获取发货内容失败')
# 发送自动发货失败通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, "未找到匹配的发货规则或获取发货内容失败")
except Exception as e: except Exception as e:
logger.error(f"自动发货处理异常: {self._safe_str(e)}") logger.error(f"自动发货处理异常: {self._safe_str(e)}")
# 发送自动发货异常通知 # 发送自动发货异常通知
await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}") await self.send_delivery_failure_notification(send_user_name, send_user_id, item_id, f"自动发货处理异常: {str(e)}")
logger.info(f'[{msg_time}] 【{self.cookie_id}】订单锁释放: {lock_key},自动发货处理完成')
except Exception as e: except Exception as e:
logger.error(f"统一自动发货处理异常: {self._safe_str(e)}") logger.error(f"统一自动发货处理异常: {self._safe_str(e)}")
@ -492,6 +647,7 @@ class XianyuLive:
return new_token return new_token
logger.error(f"{self.cookie_id}】Token刷新失败: {res_json}") logger.error(f"{self.cookie_id}】Token刷新失败: {res_json}")
# 发送Token刷新失败通知 # 发送Token刷新失败通知
await self.send_token_refresh_notification(f"Token刷新失败: {res_json}", "token_refresh_failed") await self.send_token_refresh_notification(f"Token刷新失败: {res_json}", "token_refresh_failed")
return None return None
@ -531,6 +687,40 @@ class XianyuLive:
# 发送Cookie更新失败通知 # 发送Cookie更新失败通知
await self.send_token_refresh_notification(f"Cookie更新失败: {str(e)}", "cookie_update_failed") await self.send_token_refresh_notification(f"Cookie更新失败: {str(e)}", "cookie_update_failed")
async def _restart_instance(self):
"""重启XianyuLive实例"""
try:
logger.info(f"{self.cookie_id}】Token刷新成功准备重启实例...")
# 导入CookieManager
from cookie_manager import manager as cookie_manager
if cookie_manager:
# 通过CookieManager重启实例
logger.info(f"{self.cookie_id}】通过CookieManager重启实例...")
# 使用异步方式调用update_cookie避免阻塞
def restart_task():
try:
cookie_manager.update_cookie(self.cookie_id, self.cookies_str)
logger.info(f"{self.cookie_id}】实例重启请求已发送")
except Exception as e:
logger.error(f"{self.cookie_id}】重启实例失败: {e}")
# 在后台执行重启任务
import threading
restart_thread = threading.Thread(target=restart_task, daemon=True)
restart_thread.start()
logger.info(f"{self.cookie_id}】实例重启已在后台执行")
else:
logger.warning(f"{self.cookie_id}】CookieManager不可用无法重启实例")
except Exception as e:
logger.error(f"{self.cookie_id}】重启实例失败: {self._safe_str(e)}")
# 发送重启失败通知
await self.send_token_refresh_notification(f"实例重启失败: {str(e)}", "instance_restart_failed")
async def save_item_info_to_db(self, item_id: str, item_detail: str = None, item_title: str = None): async def save_item_info_to_db(self, item_id: str, item_detail: str = None, item_title: str = None):
"""保存商品信息到数据库 """保存商品信息到数据库
@ -801,22 +991,6 @@ class XianyuLive:
logger.error("获取商品信息失败,重试次数过多") logger.error("获取商品信息失败,重试次数过多")
return {"error": "获取商品信息失败,重试次数过多"} return {"error": "获取商品信息失败,重试次数过多"}
# 如果是重试retry_count > 0强制刷新token
if retry_count > 0:
old_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else ''
logger.info(f"重试第{retry_count}强制刷新token... 当前_m_h5_tk: {old_token}")
await self.refresh_token()
new_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else ''
logger.info(f"重试刷新token完成新的_m_h5_tk: {new_token}")
else:
# 确保使用最新的token首次调用时的正常逻辑
if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval:
old_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else ''
logger.info(f"Token过期或不存在刷新token... 当前_m_h5_tk: {old_token}")
await self.refresh_token()
new_token = self.cookies.get('_m_h5_tk', '').split('_')[0] if self.cookies.get('_m_h5_tk') else ''
logger.info(f"Token刷新完成新的_m_h5_tk: {new_token}")
# 确保session已创建 # 确保session已创建
if not self.session: if not self.session:
await self.create_session() await self.create_session()
@ -844,8 +1018,6 @@ class XianyuLive:
# 始终从最新的cookies中获取_m_h5_tk token刷新后cookies会被更新 # 始终从最新的cookies中获取_m_h5_tk token刷新后cookies会被更新
token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else '' token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.warning(111)
logger.warning(token)
if token: if token:
logger.debug(f"使用cookies中的_m_h5_tk token: {token}") logger.debug(f"使用cookies中的_m_h5_tk token: {token}")
else: else:
@ -1939,100 +2111,108 @@ class XianyuLive:
return {"error": f"加密确认模块调用失败: {self._safe_str(e)}", "order_id": order_id} return {"error": f"加密确认模块调用失败: {self._safe_str(e)}", "order_id": order_id}
async def auto_freeshipping(self, order_id, item_id, buyer_id, retry_count=0): async def auto_freeshipping(self, order_id, item_id, buyer_id, retry_count=0):
"""自动免拼发货 - 使用密模块""" """自动免拼发货 - 使用密模块"""
try: try:
logger.debug(f"{self.cookie_id}】开始免拼发货订单ID: {order_id}") logger.debug(f"{self.cookie_id}】开始免拼发货订单ID: {order_id}")
# 导入超级混淆加密模块 # 导入解密后的免拼发货模块
from secure_freeshipping_ultra import SecureFreeshipping from secure_freeshipping_decrypted import SecureFreeshipping
# 创建加密免拼发货实例 # 创建免拼发货实例
secure_freeshipping = SecureFreeshipping(self.session, self.cookies_str, self.cookie_id) secure_freeshipping = SecureFreeshipping(self.session, self.cookies_str, self.cookie_id)
# 传递必要的属性 # 传递必要的属性
secure_freeshipping.current_token = self.current_token secure_freeshipping.current_token = self.current_token
secure_freeshipping.last_token_refresh_time = self.last_token_refresh_time secure_freeshipping.last_token_refresh_time = self.last_token_refresh_time
secure_freeshipping.token_refresh_interval = self.token_refresh_interval secure_freeshipping.token_refresh_interval = self.token_refresh_interval
secure_freeshipping.refresh_token = self.refresh_token # 传递refresh_token方法
# 调用加密的免拼发货方法 # 调用免拼发货方法
return await secure_freeshipping.auto_freeshipping(order_id, item_id, buyer_id, retry_count) return await secure_freeshipping.auto_freeshipping(order_id, item_id, buyer_id, retry_count)
except Exception as e: except Exception as e:
logger.error(f"{self.cookie_id}加密免拼发货模块调用失败: {self._safe_str(e)}") logger.error(f"{self.cookie_id}免拼发货模块调用失败: {self._safe_str(e)}")
return {"error": f"加密免拼发货模块调用失败: {self._safe_str(e)}", "order_id": order_id} return {"error": f"免拼发货模块调用失败: {self._safe_str(e)}", "order_id": order_id}
async def fetch_order_detail_info(self, order_id: str, item_id: str = None, buyer_id: str = None, debug_headless: bool = None): async def fetch_order_detail_info(self, order_id: str, item_id: str = None, buyer_id: str = None, debug_headless: bool = None):
"""获取订单详情信息""" """获取订单详情信息(使用独立的锁机制,不受延迟锁影响)"""
try: # 使用独立的订单详情锁,不与自动发货锁冲突
logger.info(f"{self.cookie_id}】开始获取订单详情: {order_id}") order_detail_lock = self._order_detail_locks[order_id]
# 导入订单详情获取器 # 记录订单详情锁的使用时间
from utils.order_detail_fetcher import fetch_order_detail_simple self._order_detail_lock_times[order_id] = time.time()
from db_manager import db_manager
# 获取当前账号的cookie字符串 async with order_detail_lock:
cookie_string = self.cookies_str logger.info(f"🔍 【{self.cookie_id}】获取订单详情锁 {order_id},开始处理...")
logger.debug(f"{self.cookie_id}】使用Cookie长度: {len(cookie_string) if cookie_string else 0}")
try:
logger.info(f"{self.cookie_id}】开始获取订单详情: {order_id}")
# 确定是否使用有头模式(调试用) # 导入订单详情获取器
headless_mode = True if debug_headless is None else debug_headless from utils.order_detail_fetcher import fetch_order_detail_simple
if not headless_mode: from db_manager import db_manager
logger.info(f"{self.cookie_id}】🖥️ 启用有头模式进行调试")
# 异步获取订单详情使用当前账号的cookie # 获取当前账号的cookie字符串
result = await fetch_order_detail_simple(order_id, cookie_string, headless=headless_mode) cookie_string = self.cookies_str
logger.debug(f"{self.cookie_id}】使用Cookie长度: {len(cookie_string) if cookie_string else 0}")
if result: # 确定是否使用有头模式(调试用)
logger.info(f"{self.cookie_id}】订单详情获取成功: {order_id}") headless_mode = True if debug_headless is None else debug_headless
logger.info(f"{self.cookie_id}】页面标题: {result.get('title', '未知')}") if not headless_mode:
logger.info(f"{self.cookie_id}】🖥️ 启用有头模式进行调试")
# 获取解析后的规格信息 # 异步获取订单详情使用当前账号的cookie
spec_name = result.get('spec_name', '') result = await fetch_order_detail_simple(order_id, cookie_string, headless=headless_mode)
spec_value = result.get('spec_value', '')
quantity = result.get('quantity', '')
amount = result.get('amount', '')
if spec_name and spec_value: if result:
logger.info(f"{self.cookie_id}】📋 规格名称: {spec_name}") logger.info(f"{self.cookie_id}】订单详情获取成功: {order_id}")
logger.info(f"{self.cookie_id}】📝 规格值: {spec_value}") logger.info(f"{self.cookie_id}】页面标题: {result.get('title', '未知')}")
print(f"🛍️ 【{self.cookie_id}】订单 {order_id} 规格信息: {spec_name} -> {spec_value}")
else:
logger.warning(f"{self.cookie_id}】未获取到有效的规格信息")
print(f"⚠️ 【{self.cookie_id}】订单 {order_id} 规格信息获取失败")
# 插入或更新订单信息到数据库 # 获取解析后的规格信息
try: spec_name = result.get('spec_name', '')
success = db_manager.insert_or_update_order( spec_value = result.get('spec_value', '')
order_id=order_id, quantity = result.get('quantity', '')
item_id=item_id, amount = result.get('amount', '')
buyer_id=buyer_id,
spec_name=spec_name,
spec_value=spec_value,
quantity=quantity,
amount=amount,
order_status='processed', # 已处理状态
cookie_id=self.cookie_id
)
if success: if spec_name and spec_value:
logger.info(f"{self.cookie_id}】订单信息已保存到数据库: {order_id}") logger.info(f"{self.cookie_id}】📋 规格名称: {spec_name}")
print(f"💾 【{self.cookie_id}】订单 {order_id} 信息已保存到数据库") logger.info(f"{self.cookie_id}】📝 规格值: {spec_value}")
print(f"🛍️ 【{self.cookie_id}】订单 {order_id} 规格信息: {spec_name} -> {spec_value}")
else: else:
logger.warning(f"{self.cookie_id}】订单信息保存失败: {order_id}") logger.warning(f"{self.cookie_id}】未获取到有效的规格信息")
print(f"⚠️ 【{self.cookie_id}】订单 {order_id} 规格信息获取失败")
except Exception as db_e: # 插入或更新订单信息到数据库
logger.error(f"{self.cookie_id}】保存订单信息到数据库失败: {self._safe_str(db_e)}") try:
success = db_manager.insert_or_update_order(
order_id=order_id,
item_id=item_id,
buyer_id=buyer_id,
spec_name=spec_name,
spec_value=spec_value,
quantity=quantity,
amount=amount,
order_status='processed', # 已处理状态
cookie_id=self.cookie_id
)
return result if success:
else: logger.info(f"{self.cookie_id}】订单信息已保存到数据库: {order_id}")
logger.warning(f"{self.cookie_id}】订单详情获取失败: {order_id}") print(f"💾 【{self.cookie_id}】订单 {order_id} 信息已保存到数据库")
else:
logger.warning(f"{self.cookie_id}】订单信息保存失败: {order_id}")
except Exception as db_e:
logger.error(f"{self.cookie_id}】保存订单信息到数据库失败: {self._safe_str(db_e)}")
return result
else:
logger.warning(f"{self.cookie_id}】订单详情获取失败: {order_id}")
return None
except Exception as e:
logger.error(f"{self.cookie_id}】获取订单详情异常: {self._safe_str(e)}")
return None return None
except Exception as e:
logger.error(f"{self.cookie_id}】获取订单详情异常: {self._safe_str(e)}")
return None
async def _auto_delivery(self, item_id: str, item_title: str = None, order_id: str = None, send_user_id: str = None): async def _auto_delivery(self, item_id: str, item_title: str = None, order_id: str = None, send_user_id: str = None):
"""自动发货功能 - 获取卡券规则,执行延时,确认发货,发送内容""" """自动发货功能 - 获取卡券规则,执行延时,确认发货,发送内容"""
try: try:
@ -2457,7 +2637,9 @@ class XianyuLive:
logger.info("Token即将过期准备刷新...") logger.info("Token即将过期准备刷新...")
new_token = await self.refresh_token() new_token = await self.refresh_token()
if new_token: if new_token:
logger.info(f"{self.cookie_id}】Token刷新成功准备重新建立连接...") logger.info(f"{self.cookie_id}】Token刷新成功准备重启实例...")
# 注意refresh_token方法中已经调用了_restart_instance()
# 这里只需要关闭当前连接让main循环重新开始
self.connection_restart_flag = True self.connection_restart_flag = True
if self.ws: if self.ws:
await self.ws.close() await self.ws.close()
@ -2605,6 +2787,7 @@ class XianyuLive:
} }
await ws.send(json.dumps(msg)) await ws.send(json.dumps(msg))
self.last_heartbeat_time = time.time() self.last_heartbeat_time = time.time()
logger.debug(f"{self.cookie_id}】心跳包已发送")
async def heartbeat_loop(self, ws): async def heartbeat_loop(self, ws):
"""心跳循环""" """心跳循环"""
@ -2634,22 +2817,25 @@ class XianyuLive:
return False return False
async def pause_cleanup_loop(self): async def pause_cleanup_loop(self):
"""定期清理过期的暂停记录""" """定期清理过期的暂停记录和锁"""
while True: while True:
try: try:
# 检查账号是否启用 # 检查账号是否启用
from cookie_manager import manager as cookie_manager from cookie_manager import manager as cookie_manager
if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id): if cookie_manager and not cookie_manager.get_cookie_status(self.cookie_id):
logger.info(f"{self.cookie_id}】账号已禁用,停止暂停记录清理循环") logger.info(f"{self.cookie_id}】账号已禁用,停止清理循环")
break break
# 清理过期的暂停记录 # 清理过期的暂停记录
pause_manager.cleanup_expired_pauses() pause_manager.cleanup_expired_pauses()
# 清理过期的锁每5分钟清理一次保留24小时内的锁
self.cleanup_expired_locks(max_age_hours=24)
# 每5分钟清理一次 # 每5分钟清理一次
await asyncio.sleep(300) await asyncio.sleep(300)
except Exception as e: except Exception as e:
logger.error(f"{self.cookie_id}暂停记录清理失败: {self._safe_str(e)}") logger.error(f"{self.cookie_id}清理任务失败: {self._safe_str(e)}")
await asyncio.sleep(300) # 出错后也等待5分钟再重试 await asyncio.sleep(300) # 出错后也等待5分钟再重试
async def send_msg_once(self, toid, item_id, text): async def send_msg_once(self, toid, item_id, text):
@ -3098,32 +3284,7 @@ class XianyuLive:
# 自动回复消息 # 【优先处理】检查系统消息和自动发货触发消息(不受人工接入暂停影响)
if not AUTO_REPLY.get('enabled', True):
logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】自动回复已禁用")
return
# 检查该chat_id是否处于暂停状态
if pause_manager.is_chat_paused(chat_id):
remaining_time = pause_manager.get_remaining_pause_time(chat_id)
remaining_minutes = remaining_time // 60
remaining_seconds = remaining_time % 60
logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】chat_id {chat_id} 自动回复已暂停,剩余时间: {remaining_minutes}{remaining_seconds}")
return
# 构造用户URL
user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
reply = None
# 判断是否启用API回复
if AUTO_REPLY.get('api', {}).get('enabled', False):
reply = await self.get_api_reply(
msg_time, user_url, send_user_id, send_user_name,
item_id, send_message, chat_id
)
if not reply:
logger.error(f"[{msg_time}] 【API调用失败】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {send_message}")
if send_message == '[我已拍下,待付款]': if send_message == '[我已拍下,待付款]':
logger.info(f'[{msg_time}] 【{self.cookie_id}】系统消息不处理') logger.info(f'[{msg_time}] 【{self.cookie_id}】系统消息不处理')
return return
@ -3151,13 +3312,14 @@ class XianyuLive:
elif send_message == '[你已发货]': elif send_message == '[你已发货]':
logger.info(f'[{msg_time}] 【{self.cookie_id}】发货确认消息不处理') logger.info(f'[{msg_time}] 【{self.cookie_id}】发货确认消息不处理')
return return
# 检查是否为自动发货触发消息 # 【重要】检查是否为自动发货触发消息 - 即使在人工接入暂停期间也要处理
elif self._is_auto_delivery_trigger(send_message): elif self._is_auto_delivery_trigger(send_message):
logger.info(f'[{msg_time}] 【{self.cookie_id}】检测到自动发货触发消息,即使在暂停期间也继续处理: {send_message}')
# 使用统一的自动发货处理方法 # 使用统一的自动发货处理方法
await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id, await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id,
item_id, chat_id, msg_time) item_id, chat_id, msg_time)
return return
# 【重要】检查是否为"我已小刀,待刀成"卡片消息 - 即使在人工接入暂停期间也要处理
elif send_message == '[卡片消息]': elif send_message == '[卡片消息]':
# 检查是否为"我已小刀,待刀成"的卡片消息 # 检查是否为"我已小刀,待刀成"的卡片消息
try: try:
@ -3183,7 +3345,7 @@ class XianyuLive:
# 检查是否为"我已小刀,待刀成" # 检查是否为"我已小刀,待刀成"
if card_title == "我已小刀,待刀成": if card_title == "我已小刀,待刀成":
logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成"准备自动免拼发货') logger.info(f'[{msg_time}] 【{self.cookie_id}】【系统】检测到"我已小刀,待刀成"即使在暂停期间也继续处理')
# 提取订单ID # 提取订单ID
order_id = self._extract_order_id(message) order_id = self._extract_order_id(message)
if order_id: if order_id:
@ -3197,18 +3359,46 @@ class XianyuLive:
else: else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 自动免拼发货失败: {result.get("error", "未知错误")}') logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 自动免拼发货失败: {result.get("error", "未知错误")}')
await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id, await self._handle_auto_delivery(websocket, message, send_user_name, send_user_id,
item_id, chat_id, msg_time) item_id, chat_id, msg_time)
return return
else: else:
logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID无法执行免拼发货') logger.warning(f'[{msg_time}] 【{self.cookie_id}】❌ 未能提取到订单ID无法执行免拼发货')
return return
else: else:
logger.info(f'[{msg_time}] 【{self.cookie_id}】收到卡片消息,标题: {card_title or "未知"}') logger.info(f'[{msg_time}] 【{self.cookie_id}】收到卡片消息,标题: {card_title or "未知"}')
# 如果不是目标卡片消息,继续正常处理流程(会受到暂停影响)
except Exception as e: except Exception as e:
logger.error(f"处理卡片消息异常: {self._safe_str(e)}") logger.error(f"处理卡片消息异常: {self._safe_str(e)}")
# 如果处理异常,继续正常处理流程(会受到暂停影响)
# 自动回复消息
if not AUTO_REPLY.get('enabled', True):
logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】自动回复已禁用")
return
# 检查该chat_id是否处于暂停状态
if pause_manager.is_chat_paused(chat_id):
remaining_time = pause_manager.get_remaining_pause_time(chat_id)
remaining_minutes = remaining_time // 60
remaining_seconds = remaining_time % 60
logger.info(f"[{msg_time}] 【{self.cookie_id}】【系统】chat_id {chat_id} 自动回复已暂停,剩余时间: {remaining_minutes}{remaining_seconds}")
return
# 构造用户URL
user_url = f'https://www.goofish.com/personal?userId={send_user_id}'
reply = None
# 判断是否启用API回复
if AUTO_REPLY.get('api', {}).get('enabled', False):
reply = await self.get_api_reply(
msg_time, user_url, send_user_id, send_user_name,
item_id, send_message, chat_id
)
if not reply:
logger.error(f"[{msg_time}] 【API调用失败】用户: {send_user_name} (ID: {send_user_id}), 商品({item_id}): {send_message}")
# 如果不是目标卡片消息,继续正常处理流程
# 记录回复来源 # 记录回复来源
reply_source = 'API' # 默认假设是API回复 reply_source = 'API' # 默认假设是API回复
@ -3316,8 +3506,11 @@ class XianyuLive:
self.cleanup_task = asyncio.create_task(self.pause_cleanup_loop()) self.cleanup_task = asyncio.create_task(self.pause_cleanup_loop())
logger.info(f"{self.cookie_id}】开始监听WebSocket消息...") logger.info(f"{self.cookie_id}】开始监听WebSocket消息...")
logger.info(f"{self.cookie_id}】WebSocket连接状态正常等待服务器消息...")
logger.info(f"{self.cookie_id}】准备进入消息循环...")
async for message in websocket: async for message in websocket:
logger.info(f"{self.cookie_id}】收到WebSocket消息: {len(message) if message else 0} 字节")
try: try:
message_data = json.loads(message) message_data = json.loads(message)
@ -3326,7 +3519,8 @@ class XianyuLive:
continue continue
# 处理其他消息 # 处理其他消息
await self.handle_message(message_data, websocket) # 使用异步任务处理消息,防止阻塞后续消息接收
asyncio.create_task(self.handle_message(message_data, websocket))
except Exception as e: except Exception as e:
logger.error(f"处理消息出错: {self._safe_str(e)}") logger.error(f"处理消息出错: {self._safe_str(e)}")
@ -3364,22 +3558,6 @@ class XianyuLive:
logger.error("获取商品信息失败,重试次数过多") logger.error("获取商品信息失败,重试次数过多")
return {"error": "获取商品信息失败,重试次数过多"} return {"error": "获取商品信息失败,重试次数过多"}
# 如果是重试retry_count > 0强制刷新token
if retry_count > 0:
old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"重试第{retry_count}强制刷新token... 当前_m_h5_tk: {old_token}")
await self.refresh_token()
new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"重试刷新token完成新的_m_h5_tk: {new_token}")
else:
# 确保使用最新的token首次调用时的正常逻辑
if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval:
old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"Token过期或不存在刷新token... 当前_m_h5_tk: {old_token}")
await self.refresh_token()
new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"Token刷新完成新的_m_h5_tk: {new_token}")
# 确保session已创建 # 确保session已创建
if not self.session: if not self.session:
await self.create_session() await self.create_session()

View File

@ -73,116 +73,6 @@ class SecureConfirm:
logger.error(f"{self.cookie_id}】获取真实商品ID失败: {self._safe_str(e)}") logger.error(f"{self.cookie_id}】获取真实商品ID失败: {self._safe_str(e)}")
return None return None
async def refresh_token_by_detail_api(self, retry_count=0):
"""通过商品详情API刷新token - 参照get_item_info方法"""
if retry_count >= 3: # 最多重试2次
logger.error(f"{self.cookie_id}】通过详情API刷新token失败重试次数过多")
return False
try:
# 优先使用传入的item_id否则从数据库获取
real_item_id = None
if hasattr(self, '_current_item_id') and self._current_item_id:
real_item_id = self._current_item_id
logger.debug(f"{self.cookie_id}】使用传入的商品ID: {real_item_id}")
else:
# 从数据库中获取一个真实的商品ID来请求详情API
real_item_id = await self._get_real_item_id()
if not real_item_id:
logger.warning(f"{self.cookie_id}】无法获取真实商品ID使用默认ID")
real_item_id = "123456789" # 备用ID
params = {
'jsv': '2.7.2',
'appKey': '34839810',
't': str(int(time.time()) * 1000),
'sign': '',
'v': '1.0',
'type': 'originaljson',
'accountSite': 'xianyu',
'dataType': 'json',
'timeout': '20000',
'api': 'mtop.taobao.idle.pc.detail',
'sessionOption': 'AutoLoginOnly',
}
data_val = f'{{"itemId":"{real_item_id}"}}'
data = {
'data': data_val,
}
# 从当前cookies中获取token
token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
if token:
logger.debug(f"{self.cookie_id}】使用当前token刷新: {token}")
else:
logger.warning(f"{self.cookie_id}】当前cookies中没有找到token")
# 生成签名
sign = generate_sign(params['t'], token, data_val)
params['sign'] = sign
logger.info(f"{self.cookie_id}】通过详情API刷新token使用商品ID: {real_item_id}")
async with self.session.post(
'https://h5api.m.goofish.com/h5/mtop.taobao.idle.pc.detail/1.0/',
params=params,
data=data
) as response:
# 检查并更新Cookie
if 'set-cookie' in response.headers:
new_cookies = {}
for cookie in response.headers.getall('set-cookie', []):
if '=' in cookie:
name, value = cookie.split(';')[0].split('=', 1)
new_cookies[name.strip()] = value.strip()
# 更新cookies
if new_cookies:
self.cookies.update(new_cookies)
# 生成新的cookie字符串
self.cookies_str = '; '.join([f"{k}={v}" for k, v in self.cookies.items()])
# 更新数据库中的Cookie
await self._update_config_cookies()
logger.debug(f"{self.cookie_id}】已通过详情API更新Cookie到数据库")
# 获取新的token
new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
if new_token and new_token != token:
self.current_token = new_token
self.last_token_refresh_time = time.time()
logger.info(f"{self.cookie_id}】通过详情API成功刷新token: {new_token}")
return True
else:
logger.debug(f"{self.cookie_id}】详情API返回的token未变化")
# 检查响应状态
try:
res_json = await response.json()
if isinstance(res_json, dict):
ret_value = res_json.get('ret', [])
if any('SUCCESS::调用成功' in ret for ret in ret_value):
logger.debug(f"{self.cookie_id}】详情API调用成功")
return True
else:
logger.warning(f"{self.cookie_id}】详情API调用失败: {ret_value}")
if retry_count < 2:
await asyncio.sleep(0.5)
return await self.refresh_token_by_detail_api(retry_count + 1)
except:
logger.debug(f"{self.cookie_id}】详情API响应解析失败但可能已获取到新cookies")
return bool(self.current_token)
except Exception as e:
logger.error(f"{self.cookie_id}】通过详情API刷新token异常: {self._safe_str(e)}")
if retry_count < 2:
await asyncio.sleep(0.5)
return await self.refresh_token_by_detail_api(retry_count + 1)
return False
async def _update_config_cookies(self): async def _update_config_cookies(self):
"""更新数据库中的Cookie配置""" """更新数据库中的Cookie配置"""
try: try:
@ -193,41 +83,6 @@ class SecureConfirm:
except Exception as e: except Exception as e:
logger.error(f"{self.cookie_id}】更新数据库Cookie失败: {self._safe_str(e)}") logger.error(f"{self.cookie_id}】更新数据库Cookie失败: {self._safe_str(e)}")
async def refresh_token(self):
"""刷新token - 优先使用详情API失败时调用主界面类的方法"""
# 首先尝试通过详情API刷新token
success = await self.refresh_token_by_detail_api()
if success:
return self.current_token
# 如果详情API失败尝试调用主界面类的方法
if self.main_instance and hasattr(self.main_instance, 'refresh_token'):
try:
logger.debug(f"{self.cookie_id}】详情API刷新失败调用主界面类的refresh_token方法")
new_token = await self.main_instance.refresh_token()
if new_token:
self.current_token = new_token
self.last_token_refresh_time = time.time()
# 更新本地的cookies_str
self.cookies_str = self.main_instance.cookies_str
# 重新解析cookies
self.cookies = {}
if self.cookies_str:
for cookie in self.cookies_str.split(';'):
if '=' in cookie:
key, value = cookie.strip().split('=', 1)
self.cookies[key] = value
logger.debug(f"{self.cookie_id}】通过主界面类Token刷新成功已同步cookies")
return new_token
else:
logger.warning(f"{self.cookie_id}】主界面类Token刷新失败")
return None
except Exception as e:
logger.error(f"{self.cookie_id}】调用主界面类refresh_token失败: {self._safe_str(e)}")
return None
else:
logger.warning(f"{self.cookie_id}】主界面类实例不存在或没有refresh_token方法")
return None
async def auto_confirm(self, order_id, item_id=None, retry_count=0): async def auto_confirm(self, order_id, item_id=None, retry_count=0):
"""自动确认发货 - 使用真实商品ID刷新token""" """自动确认发货 - 使用真实商品ID刷新token"""
@ -240,22 +95,6 @@ class SecureConfirm:
self._current_item_id = item_id self._current_item_id = item_id
logger.debug(f"{self.cookie_id}】设置当前商品ID: {item_id}") logger.debug(f"{self.cookie_id}】设置当前商品ID: {item_id}")
# 如果是重试retry_count > 0强制刷新token
if retry_count > 0:
old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"{self.cookie_id}】重试第{retry_count}强制刷新token... 当前_m_h5_tk: {old_token}")
await self.refresh_token()
new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"{self.cookie_id}】重试刷新token完成新的_m_h5_tk: {new_token}")
else:
# 确保使用最新的token首次调用时的正常逻辑
if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval:
old_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"{self.cookie_id}】Token过期或不存在刷新token... 当前_m_h5_tk: {old_token}")
await self.refresh_token()
new_token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
logger.info(f"{self.cookie_id}】Token刷新完成新的_m_h5_tk: {new_token}")
# 确保session已创建 # 确保session已创建
if not self.session: if not self.session:
raise Exception("Session未创建") raise Exception("Session未创建")

View File

@ -0,0 +1,134 @@
import asyncio
import time
from loguru import logger
from utils.xianyu_utils import trans_cookies, generate_sign
class SecureFreeshipping:
def __init__(self, session, cookies_str, cookie_id):
self.session = session
self.cookies_str = cookies_str
self.cookie_id = cookie_id
self.cookies = trans_cookies(cookies_str) if cookies_str else {}
# 这些属性将由主类传递
self.current_token = None
self.last_token_refresh_time = None
self.token_refresh_interval = None
def _safe_str(self, obj):
"""安全转换为字符串"""
try:
return str(obj)
except:
return "无法转换的对象"
async def update_config_cookies(self):
"""更新数据库中的cookies"""
try:
from db_manager import db_manager
# 更新数据库中的Cookie
db_manager.update_config_cookies(self.cookie_id, self.cookies_str)
logger.debug(f"{self.cookie_id}】Cookie已更新到数据库")
except Exception as e:
logger.error(f"{self.cookie_id}】更新Cookie到数据库失败: {self._safe_str(e)}")
async def auto_freeshipping(self, order_id, item_id, buyer_id, retry_count=0):
"""自动免拼发货 - 加密版本"""
if retry_count >= 4: # 最多重试3次
logger.error("免拼发货发货失败,重试次数过多")
return {"error": "免拼发货发货失败,重试次数过多"}
# 确保session已创建
if not self.session:
raise Exception("Session未创建")
params = {
'jsv': '2.7.2',
'appKey': '34839810',
't': str(int(time.time()) * 1000),
'sign': '',
'v': '1.0',
'type': 'originaljson',
'accountSite': 'xianyu',
'dataType': 'json',
'timeout': '20000',
'api': 'mtop.idle.groupon.activity.seller.freeshipping',
'sessionOption': 'AutoLoginOnly',
}
data_val = '{"bizOrderId":"' + order_id + '", "itemId":' + item_id + ',"buyerId":' + buyer_id + '}'
data = {
'data': data_val,
}
# 打印参数信息
logger.info(f"{self.cookie_id}】免拼发货请求参数: data_val = {data_val}")
logger.info(f"{self.cookie_id}】参数详情 - order_id: {order_id}, item_id: {item_id}, buyer_id: {buyer_id}")
# 始终从最新的cookies中获取_m_h5_tk token刷新后cookies会被更新
token = trans_cookies(self.cookies_str).get('_m_h5_tk', '').split('_')[0] if trans_cookies(self.cookies_str).get('_m_h5_tk') else ''
if token:
logger.info(f"使用cookies中的_m_h5_tk token: {token}")
else:
logger.warning("cookies中没有找到_m_h5_tk token")
sign = generate_sign(params['t'], token, data_val)
params['sign'] = sign
try:
logger.info(f"{self.cookie_id}】开始自动免拼发货订单ID: {order_id}")
async with self.session.post(
'https://h5api.m.goofish.com/h5/mtop.idle.groupon.activity.seller.freeshipping/1.0/',
params=params,
data=data
) as response:
res_json = await response.json()
# 检查并更新Cookie
if 'set-cookie' in response.headers:
new_cookies = {}
for cookie in response.headers.getall('set-cookie', []):
if '=' in cookie:
name, value = cookie.split(';')[0].split('=', 1)
new_cookies[name.strip()] = value.strip()
# 更新cookies
if new_cookies:
self.cookies.update(new_cookies)
# 生成新的cookie字符串
self.cookies_str = '; '.join([f"{k}={v}" for k, v in self.cookies.items()])
# 更新数据库中的Cookie
await self.update_config_cookies()
logger.debug("已更新Cookie到数据库")
logger.info(f"{self.cookie_id}】自动免拼发货响应: {res_json}")
# 检查响应结果
if res_json.get('ret') and res_json['ret'][0] == 'SUCCESS::调用成功':
logger.info(f"{self.cookie_id}】✅ 自动免拼发货成功订单ID: {order_id}")
return {"success": True, "order_id": order_id}
else:
error_msg = res_json.get('ret', ['未知错误'])[0] if res_json.get('ret') else '未知错误'
logger.warning(f"{self.cookie_id}】❌ 自动免拼发货失败: {error_msg}")
# 如果是token相关错误进行重试
if 'token' in error_msg.lower() or 'sign' in error_msg.lower():
logger.info(f"{self.cookie_id}】检测到token错误准备重试...")
return await self.auto_freeshipping(order_id, item_id, buyer_id, retry_count + 1)
return {"error": error_msg, "order_id": order_id}
except Exception as e:
logger.error(f"{self.cookie_id}】自动免拼发货API请求异常: {self._safe_str(e)}")
await asyncio.sleep(0.5)
# 网络异常也进行重试
if retry_count < 2:
logger.info(f"{self.cookie_id}】网络异常,准备重试...")
return await self.auto_freeshipping(order_id, item_id, buyer_id, retry_count + 1)
return {"error": f"网络异常: {self._safe_str(e)}", "order_id": order_id}