from __future__ import annotations import asyncio from typing import Dict, List, Tuple, Optional from loguru import logger from db_manager import db_manager __all__ = ["CookieManager", "manager"] class CookieManager: """管理多账号 Cookie 及其对应的 XianyuLive 任务和关键字""" def __init__(self, loop: asyncio.AbstractEventLoop): self.loop = loop self.cookies: Dict[str, str] = {} self.tasks: Dict[str, asyncio.Task] = {} self.keywords: Dict[str, List[Tuple[str, str]]] = {} self.cookie_status: Dict[str, bool] = {} # 账号启用状态 self._load_from_db() def _load_from_db(self): """从数据库加载所有Cookie、关键字和状态""" try: # 加载所有Cookie self.cookies = db_manager.get_all_cookies() # 加载所有关键字 self.keywords = db_manager.get_all_keywords() # 加载所有Cookie状态(默认启用) self.cookie_status = db_manager.get_all_cookie_status() # 为没有状态记录的Cookie设置默认启用状态 for cookie_id in self.cookies.keys(): if cookie_id not in self.cookie_status: self.cookie_status[cookie_id] = True logger.info(f"从数据库加载了 {len(self.cookies)} 个Cookie、{len(self.keywords)} 组关键字和 {len(self.cookie_status)} 个状态记录") except Exception as e: logger.error(f"从数据库加载数据失败: {e}") def reload_from_db(self): """重新从数据库加载所有数据(用于备份导入后刷新)""" logger.info("重新从数据库加载数据...") old_cookies_count = len(self.cookies) old_keywords_count = len(self.keywords) # 重新加载数据 self._load_from_db() new_cookies_count = len(self.cookies) new_keywords_count = len(self.keywords) logger.info(f"数据重新加载完成: Cookie {old_cookies_count} -> {new_cookies_count}, 关键字组 {old_keywords_count} -> {new_keywords_count}") return True # ------------------------ 内部协程 ------------------------ async def _run_xianyu(self, cookie_id: str, cookie_value: str, user_id: int = None): """在事件循环中启动 XianyuLive.main""" from XianyuAutoAsync import XianyuLive # 延迟导入,避免循环 try: live = XianyuLive(cookie_value, cookie_id=cookie_id, user_id=user_id) await live.main() except asyncio.CancelledError: logger.info(f"XianyuLive 任务已取消: {cookie_id}") except Exception as e: logger.error(f"XianyuLive 任务异常({cookie_id}): {e}") async def _add_cookie_async(self, cookie_id: str, cookie_value: str, user_id: int = None): if cookie_id in self.tasks: raise ValueError("Cookie ID already exists") self.cookies[cookie_id] = cookie_value # 保存到数据库,如果没有指定user_id,则保持原有绑定关系 db_manager.save_cookie(cookie_id, cookie_value, user_id) # 获取实际保存的user_id(如果没有指定,数据库会返回实际的user_id) actual_user_id = user_id if actual_user_id is None: # 从数据库获取Cookie对应的user_id cookie_info = db_manager.get_cookie_details(cookie_id) if cookie_info: actual_user_id = cookie_info.get('user_id') task = self.loop.create_task(self._run_xianyu(cookie_id, cookie_value, actual_user_id)) self.tasks[cookie_id] = task logger.info(f"已启动账号任务: {cookie_id} (用户ID: {actual_user_id})") async def _remove_cookie_async(self, cookie_id: str): task = self.tasks.pop(cookie_id, None) if task: task.cancel() self.cookies.pop(cookie_id, None) self.keywords.pop(cookie_id, None) # 从数据库删除 db_manager.delete_cookie(cookie_id) logger.info(f"已移除账号: {cookie_id}") # ------------------------ 对外线程安全接口 ------------------------ def add_cookie(self, cookie_id: str, cookie_value: str, kw_list: Optional[List[Tuple[str, str]]] = None, user_id: int = None): """线程安全新增 Cookie 并启动任务""" if kw_list is not None: self.keywords[cookie_id] = kw_list else: self.keywords.setdefault(cookie_id, []) try: current_loop = asyncio.get_running_loop() except RuntimeError: current_loop = None if current_loop and current_loop == self.loop: # 同一事件循环中,直接调度 return self.loop.create_task(self._add_cookie_async(cookie_id, cookie_value, user_id)) else: fut = asyncio.run_coroutine_threadsafe(self._add_cookie_async(cookie_id, cookie_value, user_id), self.loop) return fut.result() def remove_cookie(self, cookie_id: str): try: current_loop = asyncio.get_running_loop() except RuntimeError: current_loop = None if current_loop and current_loop == self.loop: return self.loop.create_task(self._remove_cookie_async(cookie_id)) else: fut = asyncio.run_coroutine_threadsafe(self._remove_cookie_async(cookie_id), self.loop) return fut.result() # 更新 Cookie 值 def update_cookie(self, cookie_id: str, new_value: str): """替换指定账号的 Cookie 并重启任务""" async def _update(): # 获取原有的user_id和关键词 original_user_id = None original_keywords = [] original_status = True cookie_info = db_manager.get_cookie_details(cookie_id) if cookie_info: original_user_id = cookie_info.get('user_id') # 保存原有的关键词和状态 if cookie_id in self.keywords: original_keywords = self.keywords[cookie_id].copy() if cookie_id in self.cookie_status: original_status = self.cookie_status[cookie_id] # 先移除任务(但不删除数据库记录) task = self.tasks.pop(cookie_id, None) if task: task.cancel() # 更新Cookie值(保持原有user_id,不删除关键词) self.cookies[cookie_id] = new_value db_manager.save_cookie(cookie_id, new_value, original_user_id) # 恢复关键词和状态 self.keywords[cookie_id] = original_keywords self.cookie_status[cookie_id] = original_status # 重新启动任务 task = self.loop.create_task(self._run_xianyu(cookie_id, new_value, original_user_id)) self.tasks[cookie_id] = task logger.info(f"已更新Cookie并重启任务: {cookie_id} (用户ID: {original_user_id}, 关键词: {len(original_keywords)}条)") try: current_loop = asyncio.get_running_loop() except RuntimeError: current_loop = None if current_loop and current_loop == self.loop: return self.loop.create_task(_update()) else: fut = asyncio.run_coroutine_threadsafe(_update(), self.loop) return fut.result() def update_keywords(self, cookie_id: str, kw_list: List[Tuple[str, str]]): """线程安全更新关键字""" self.keywords[cookie_id] = kw_list # 保存到数据库 db_manager.save_keywords(cookie_id, kw_list) logger.info(f"更新关键字: {cookie_id} -> {len(kw_list)} 条") # 查询接口 def list_cookies(self): return list(self.cookies.keys()) def get_keywords(self, cookie_id: str) -> List[Tuple[str, str]]: return self.keywords.get(cookie_id, []) def update_cookie_status(self, cookie_id: str, enabled: bool): """更新Cookie的启用/禁用状态""" if cookie_id not in self.cookies: raise ValueError(f"Cookie ID {cookie_id} 不存在") old_status = self.cookie_status.get(cookie_id, True) self.cookie_status[cookie_id] = enabled # 保存到数据库 db_manager.save_cookie_status(cookie_id, enabled) logger.info(f"更新Cookie状态: {cookie_id} -> {'启用' if enabled else '禁用'}") # 如果状态发生变化,需要启动或停止任务 if old_status != enabled: if enabled: # 启用账号:启动任务 self._start_cookie_task(cookie_id) else: # 禁用账号:停止任务 self._stop_cookie_task(cookie_id) def get_cookie_status(self, cookie_id: str) -> bool: """获取Cookie的启用状态""" return self.cookie_status.get(cookie_id, True) # 默认启用 def get_enabled_cookies(self) -> Dict[str, str]: """获取所有启用的Cookie""" return {cid: value for cid, value in self.cookies.items() if self.cookie_status.get(cid, True)} def _start_cookie_task(self, cookie_id: str): """启动指定Cookie的任务""" if cookie_id in self.tasks: logger.warning(f"Cookie任务已存在,跳过启动: {cookie_id}") return cookie_value = self.cookies.get(cookie_id) if not cookie_value: logger.error(f"Cookie值不存在,无法启动任务: {cookie_id}") return try: # 获取Cookie对应的user_id cookie_info = db_manager.get_cookie_details(cookie_id) user_id = cookie_info.get('user_id') if cookie_info else None # 使用异步方式启动任务 if hasattr(self.loop, 'is_running') and self.loop.is_running(): # 事件循环正在运行,使用run_coroutine_threadsafe fut = asyncio.run_coroutine_threadsafe( self._add_cookie_async(cookie_id, cookie_value, user_id), self.loop ) fut.result(timeout=5) # 等待最多5秒 else: # 事件循环未运行,直接创建任务 task = self.loop.create_task(self._run_xianyu(cookie_id, cookie_value, user_id)) self.tasks[cookie_id] = task logger.info(f"成功启动Cookie任务: {cookie_id}") except Exception as e: logger.error(f"启动Cookie任务失败: {cookie_id}, {e}") def _stop_cookie_task(self, cookie_id: str): """停止指定Cookie的任务""" if cookie_id not in self.tasks: logger.warning(f"Cookie任务不存在,跳过停止: {cookie_id}") return try: task = self.tasks[cookie_id] if not task.done(): task.cancel() logger.info(f"已取消Cookie任务: {cookie_id}") del self.tasks[cookie_id] logger.info(f"成功停止Cookie任务: {cookie_id}") except Exception as e: logger.error(f"停止Cookie任务失败: {cookie_id}, {e}") # 在 Start.py 中会把此变量赋值为具体实例 manager: Optional[CookieManager] = None