xianyu-auto-reply/reply_server.py
2025-07-31 23:17:33 +08:00

3081 lines
110 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Depends, status, UploadFile, File
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse, StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import List, Tuple, Optional, Dict, Any
from pathlib import Path
from urllib.parse import unquote
import hashlib
import secrets
import time
import json
import os
import uvicorn
import pandas as pd
import io
import cookie_manager
from db_manager import db_manager
from file_log_collector import setup_file_logging, get_file_log_collector
from ai_reply_engine import ai_reply_engine
from loguru import logger
# 关键字文件路径
KEYWORDS_FILE = Path(__file__).parent / "回复关键字.txt"
# 简单的用户认证配置
ADMIN_USERNAME = "admin"
DEFAULT_ADMIN_PASSWORD = "admin123" # 系统初始化时的默认密码
SESSION_TOKENS = {} # 存储会话token: {token: {'user_id': int, 'username': str, 'timestamp': float}}
TOKEN_EXPIRE_TIME = 24 * 60 * 60 # token过期时间24小时
# HTTP Bearer认证
security = HTTPBearer(auto_error=False)
# 不再需要单独的密码初始化,由数据库初始化时处理
def load_keywords() -> List[Tuple[str, str]]:
"""读取关键字→回复映射表
文件格式支持:
关键字<空格/制表符/冒号>回复内容
忽略空行和以 # 开头的注释行
"""
mapping: List[Tuple[str, str]] = []
if not KEYWORDS_FILE.exists():
return mapping
with KEYWORDS_FILE.open('r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
# 尝试用\t、空格、冒号分隔
if '\t' in line:
key, reply = line.split('\t', 1)
elif ' ' in line:
key, reply = line.split(' ', 1)
elif ':' in line:
key, reply = line.split(':', 1)
else:
# 无法解析的行,跳过
continue
mapping.append((key.strip(), reply.strip()))
return mapping
KEYWORDS_MAPPING = load_keywords()
# 认证相关模型
class LoginRequest(BaseModel):
username: Optional[str] = None
password: Optional[str] = None
email: Optional[str] = None
verification_code: Optional[str] = None
class LoginResponse(BaseModel):
success: bool
token: Optional[str] = None
message: str
user_id: Optional[int] = None
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
class RegisterRequest(BaseModel):
username: str
email: str
password: str
verification_code: str
class RegisterResponse(BaseModel):
success: bool
message: str
class SendCodeRequest(BaseModel):
email: str
session_id: Optional[str] = None
type: Optional[str] = 'register' # 'register' 或 'login'
class SendCodeResponse(BaseModel):
success: bool
message: str
class CaptchaRequest(BaseModel):
session_id: str
class CaptchaResponse(BaseModel):
success: bool
captcha_image: str
session_id: str
message: str
class VerifyCaptchaRequest(BaseModel):
session_id: str
captcha_code: str
class VerifyCaptchaResponse(BaseModel):
success: bool
message: str
def generate_token() -> str:
"""生成随机token"""
return secrets.token_urlsafe(32)
def verify_token(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:
"""验证token并返回用户信息"""
if not credentials:
return None
token = credentials.credentials
if token not in SESSION_TOKENS:
return None
token_data = SESSION_TOKENS[token]
# 检查token是否过期
if time.time() - token_data['timestamp'] > TOKEN_EXPIRE_TIME:
del SESSION_TOKENS[token]
return None
return token_data
def verify_admin_token(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict[str, Any]:
"""验证管理员token"""
user_info = verify_token(credentials)
if not user_info:
raise HTTPException(status_code=401, detail="未授权访问")
# 检查是否是管理员
if user_info['username'] != ADMIN_USERNAME:
raise HTTPException(status_code=403, detail="需要管理员权限")
return user_info
def require_auth(user_info: Optional[Dict[str, Any]] = Depends(verify_token)):
"""需要认证的依赖,返回用户信息"""
if not user_info:
raise HTTPException(status_code=401, detail="未授权访问")
return user_info
def get_current_user(user_info: Dict[str, Any] = Depends(require_auth)) -> Dict[str, Any]:
"""获取当前登录用户信息"""
return user_info
def get_current_user_optional(user_info: Optional[Dict[str, Any]] = Depends(verify_token)) -> Optional[Dict[str, Any]]:
"""获取当前用户信息(可选,不强制要求登录)"""
return user_info
def get_user_log_prefix(user_info: Dict[str, Any] = None) -> str:
"""获取用户日志前缀"""
if user_info:
return f"{user_info['username']}#{user_info['user_id']}"
return "【系统】"
def require_admin(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""要求管理员权限"""
if current_user['username'] != 'admin':
raise HTTPException(status_code=403, detail="需要管理员权限")
return current_user
def log_with_user(level: str, message: str, user_info: Dict[str, Any] = None):
"""带用户信息的日志记录"""
prefix = get_user_log_prefix(user_info)
full_message = f"{prefix} {message}"
if level.lower() == 'info':
logger.info(full_message)
elif level.lower() == 'error':
logger.error(full_message)
elif level.lower() == 'warning':
logger.warning(full_message)
elif level.lower() == 'debug':
logger.debug(full_message)
else:
logger.info(full_message)
def match_reply(cookie_id: str, message: str) -> Optional[str]:
"""根据 cookie_id 及消息内容匹配回复
只有启用的账号才会匹配关键字回复
"""
mgr = cookie_manager.manager
if mgr is None:
return None
# 检查账号是否启用
if not mgr.get_cookie_status(cookie_id):
return None # 禁用的账号不参与自动回复
# 优先账号级关键字
if mgr.get_keywords(cookie_id):
for k, r in mgr.get_keywords(cookie_id):
if k in message:
return r
# 全局关键字
for k, r in KEYWORDS_MAPPING:
if k in message:
return r
return None
class RequestModel(BaseModel):
cookie_id: str
msg_time: str
user_url: str
send_user_id: str
send_user_name: str
item_id: str
send_message: str
chat_id: str
class ResponseData(BaseModel):
send_msg: str
class ResponseModel(BaseModel):
code: int
data: ResponseData
app = FastAPI(
title="Xianyu Auto Reply API",
version="1.0.0",
description="闲鱼自动回复系统API",
docs_url="/docs",
redoc_url="/redoc"
)
# 初始化文件日志收集器
setup_file_logging()
# 添加一条测试日志
from loguru import logger
logger.info("Web服务器启动文件日志收集器已初始化")
# 添加请求日志中间件
@app.middleware("http")
async def log_requests(request, call_next):
start_time = time.time()
# 获取用户信息
user_info = "未登录"
try:
# 从请求头中获取Authorization
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
if token in SESSION_TOKENS:
token_data = SESSION_TOKENS[token]
# 检查token是否过期
if time.time() - token_data['timestamp'] <= TOKEN_EXPIRE_TIME:
user_info = f"{token_data['username']}#{token_data['user_id']}"
except Exception:
pass
logger.info(f"🌐 {user_info} API请求: {request.method} {request.url.path}")
response = await call_next(request)
process_time = time.time() - start_time
logger.info(f"{user_info} API响应: {request.method} {request.url.path} - {response.status_code} ({process_time:.3f}s)")
return response
# 提供前端静态文件
import os
static_dir = os.path.join(os.path.dirname(__file__), 'static')
if not os.path.exists(static_dir):
os.makedirs(static_dir, exist_ok=True)
app.mount('/static', StaticFiles(directory=static_dir), name='static')
# 健康检查端点
@app.get('/health')
async def health_check():
"""健康检查端点用于Docker健康检查和负载均衡器"""
try:
# 检查Cookie管理器状态
manager_status = "ok" if cookie_manager.manager is not None else "error"
# 检查数据库连接
from db_manager import db_manager
try:
db_manager.get_all_cookies()
db_status = "ok"
except Exception:
db_status = "error"
# 获取系统状态
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
memory_info = psutil.virtual_memory()
status = {
"status": "healthy" if manager_status == "ok" and db_status == "ok" else "unhealthy",
"timestamp": time.time(),
"services": {
"cookie_manager": manager_status,
"database": db_status
},
"system": {
"cpu_percent": cpu_percent,
"memory_percent": memory_info.percent,
"memory_available": memory_info.available
}
}
if status["status"] == "unhealthy":
raise HTTPException(status_code=503, detail=status)
return status
except Exception as e:
return {
"status": "unhealthy",
"timestamp": time.time(),
"error": str(e)
}
# 重定向根路径到登录页面
@app.get('/', response_class=HTMLResponse)
async def root():
login_path = os.path.join(static_dir, 'login.html')
if os.path.exists(login_path):
with open(login_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>Login page not found</h3>')
# 登录页面路由
@app.get('/login.html', response_class=HTMLResponse)
async def login_page():
login_path = os.path.join(static_dir, 'login.html')
if os.path.exists(login_path):
with open(login_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>Login page not found</h3>')
# 注册页面路由
@app.get('/register.html', response_class=HTMLResponse)
async def register_page():
register_path = os.path.join(static_dir, 'register.html')
if os.path.exists(register_path):
with open(register_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>Register page not found</h3>')
# 管理页面不需要服务器端认证由前端JavaScript处理
@app.get('/admin', response_class=HTMLResponse)
async def admin_page():
index_path = os.path.join(static_dir, 'index.html')
if not os.path.exists(index_path):
return HTMLResponse('<h3>No front-end found</h3>')
with open(index_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
# 用户管理页面路由
@app.get('/user_management.html', response_class=HTMLResponse)
async def user_management_page():
page_path = os.path.join(static_dir, 'user_management.html')
if os.path.exists(page_path):
with open(page_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>User management page not found</h3>')
# 日志管理页面路由
@app.get('/log_management.html', response_class=HTMLResponse)
async def log_management_page():
page_path = os.path.join(static_dir, 'log_management.html')
if os.path.exists(page_path):
with open(page_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>Log management page not found</h3>')
# 数据管理页面路由
@app.get('/data_management.html', response_class=HTMLResponse)
async def data_management_page():
page_path = os.path.join(static_dir, 'data_management.html')
if os.path.exists(page_path):
with open(page_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>Data management page not found</h3>')
# 商品搜索页面路由
@app.get('/item_search.html', response_class=HTMLResponse)
async def item_search_page():
page_path = os.path.join(static_dir, 'item_search.html')
if os.path.exists(page_path):
with open(page_path, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
else:
return HTMLResponse('<h3>Item search page not found</h3>')
# 登录接口
@app.post('/login')
async def login(request: LoginRequest):
from db_manager import db_manager
# 判断登录方式
if request.username and request.password:
# 用户名/密码登录
logger.info(f"{request.username}】尝试用户名登录")
# 统一使用用户表验证包括admin用户
if db_manager.verify_user_password(request.username, request.password):
user = db_manager.get_user_by_username(request.username)
if user:
# 生成token
token = generate_token()
SESSION_TOKENS[token] = {
'user_id': user['id'],
'username': user['username'],
'timestamp': time.time()
}
# 区分管理员和普通用户的日志
if user['username'] == ADMIN_USERNAME:
logger.info(f"{user['username']}#{user['id']}】登录成功(管理员)")
else:
logger.info(f"{user['username']}#{user['id']}】登录成功")
return LoginResponse(
success=True,
token=token,
message="登录成功",
user_id=user['id']
)
logger.warning(f"{request.username}】登录失败:用户名或密码错误")
return LoginResponse(
success=False,
message="用户名或密码错误"
)
elif request.email and request.password:
# 邮箱/密码登录
logger.info(f"{request.email}】尝试邮箱密码登录")
user = db_manager.get_user_by_email(request.email)
if user and db_manager.verify_user_password(user['username'], request.password):
# 生成token
token = generate_token()
SESSION_TOKENS[token] = {
'user_id': user['id'],
'username': user['username'],
'timestamp': time.time()
}
logger.info(f"{user['username']}#{user['id']}】邮箱登录成功")
return LoginResponse(
success=True,
token=token,
message="登录成功",
user_id=user['id']
)
logger.warning(f"{request.email}】邮箱登录失败:邮箱或密码错误")
return LoginResponse(
success=False,
message="邮箱或密码错误"
)
elif request.email and request.verification_code:
# 邮箱/验证码登录
logger.info(f"{request.email}】尝试邮箱验证码登录")
# 验证邮箱验证码
if not db_manager.verify_email_code(request.email, request.verification_code, 'login'):
logger.warning(f"{request.email}】验证码登录失败:验证码错误或已过期")
return LoginResponse(
success=False,
message="验证码错误或已过期"
)
# 获取用户信息
user = db_manager.get_user_by_email(request.email)
if not user:
logger.warning(f"{request.email}】验证码登录失败:用户不存在")
return LoginResponse(
success=False,
message="用户不存在"
)
# 生成token
token = generate_token()
SESSION_TOKENS[token] = {
'user_id': user['id'],
'username': user['username'],
'timestamp': time.time()
}
logger.info(f"{user['username']}#{user['id']}】验证码登录成功")
return LoginResponse(
success=True,
token=token,
message="登录成功",
user_id=user['id']
)
else:
return LoginResponse(
success=False,
message="请提供有效的登录信息"
)
# 验证token接口
@app.get('/verify')
async def verify(user_info: Optional[Dict[str, Any]] = Depends(verify_token)):
if user_info:
return {
"authenticated": True,
"user_id": user_info['user_id'],
"username": user_info['username']
}
return {"authenticated": False}
# 登出接口
@app.post('/logout')
async def logout(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)):
if credentials and credentials.credentials in SESSION_TOKENS:
del SESSION_TOKENS[credentials.credentials]
return {"message": "已登出"}
# 修改管理员密码接口
@app.post('/change-admin-password')
async def change_admin_password(request: ChangePasswordRequest, admin_user: Dict[str, Any] = Depends(verify_admin_token)):
from db_manager import db_manager
try:
# 验证当前密码(使用用户表验证)
if not db_manager.verify_user_password('admin', request.current_password):
return {"success": False, "message": "当前密码错误"}
# 更新密码(使用用户表更新)
success = db_manager.update_user_password('admin', request.new_password)
if success:
logger.info(f"【admin#{admin_user['user_id']}】管理员密码修改成功")
return {"success": True, "message": "密码修改成功"}
else:
return {"success": False, "message": "密码修改失败"}
except Exception as e:
logger.error(f"修改管理员密码异常: {e}")
return {"success": False, "message": "系统错误"}
# 生成图形验证码接口
@app.post('/generate-captcha')
async def generate_captcha(request: CaptchaRequest):
from db_manager import db_manager
try:
# 生成图形验证码
captcha_text, captcha_image = db_manager.generate_captcha()
if not captcha_image:
return CaptchaResponse(
success=False,
captcha_image="",
session_id=request.session_id,
message="图形验证码生成失败"
)
# 保存验证码到数据库
if db_manager.save_captcha(request.session_id, captcha_text):
return CaptchaResponse(
success=True,
captcha_image=captcha_image,
session_id=request.session_id,
message="图形验证码生成成功"
)
else:
return CaptchaResponse(
success=False,
captcha_image="",
session_id=request.session_id,
message="图形验证码保存失败"
)
except Exception as e:
logger.error(f"生成图形验证码失败: {e}")
return CaptchaResponse(
success=False,
captcha_image="",
session_id=request.session_id,
message="图形验证码生成失败"
)
# 验证图形验证码接口
@app.post('/verify-captcha')
async def verify_captcha(request: VerifyCaptchaRequest):
from db_manager import db_manager
try:
if db_manager.verify_captcha(request.session_id, request.captcha_code):
return VerifyCaptchaResponse(
success=True,
message="图形验证码验证成功"
)
else:
return VerifyCaptchaResponse(
success=False,
message="图形验证码错误或已过期"
)
except Exception as e:
logger.error(f"验证图形验证码失败: {e}")
return VerifyCaptchaResponse(
success=False,
message="图形验证码验证失败"
)
# 发送验证码接口(需要先验证图形验证码)
@app.post('/send-verification-code')
async def send_verification_code(request: SendCodeRequest):
from db_manager import db_manager
try:
# 检查是否已验证图形验证码
# 通过检查数据库中是否存在已验证的图形验证码记录
with db_manager.lock:
cursor = db_manager.conn.cursor()
current_time = time.time()
# 查找最近5分钟内该session_id的验证记录
# 由于验证成功后验证码会被删除,我们需要另一种方式来跟踪验证状态
# 这里我们检查该session_id是否在最近验证过通过检查是否有已删除的记录
# 为了简化,我们要求前端在验证图形验证码成功后立即发送邮件验证码
# 或者我们可以在验证成功后设置一个临时标记
pass
# 根据验证码类型进行不同的检查
if request.type == 'register':
# 注册验证码:检查邮箱是否已注册
existing_user = db_manager.get_user_by_email(request.email)
if existing_user:
return SendCodeResponse(
success=False,
message="该邮箱已被注册"
)
elif request.type == 'login':
# 登录验证码:检查邮箱是否存在
existing_user = db_manager.get_user_by_email(request.email)
if not existing_user:
return SendCodeResponse(
success=False,
message="该邮箱未注册"
)
# 生成验证码
code = db_manager.generate_verification_code()
# 保存验证码到数据库
if not db_manager.save_verification_code(request.email, code, request.type):
return SendCodeResponse(
success=False,
message="验证码保存失败,请稍后重试"
)
# 发送验证码邮件
if await db_manager.send_verification_email(request.email, code):
return SendCodeResponse(
success=True,
message="验证码已发送到您的邮箱,请查收"
)
else:
return SendCodeResponse(
success=False,
message="验证码发送失败,请检查邮箱地址或稍后重试"
)
except Exception as e:
logger.error(f"发送验证码失败: {e}")
return SendCodeResponse(
success=False,
message="发送验证码失败,请稍后重试"
)
# 用户注册接口
@app.post('/register')
async def register(request: RegisterRequest):
from db_manager import db_manager
try:
logger.info(f"{request.username}】尝试注册,邮箱: {request.email}")
# 验证邮箱验证码
if not db_manager.verify_email_code(request.email, request.verification_code):
logger.warning(f"{request.username}】注册失败: 验证码错误或已过期")
return RegisterResponse(
success=False,
message="验证码错误或已过期"
)
# 检查用户名是否已存在
existing_user = db_manager.get_user_by_username(request.username)
if existing_user:
logger.warning(f"{request.username}】注册失败: 用户名已存在")
return RegisterResponse(
success=False,
message="用户名已存在"
)
# 检查邮箱是否已注册
existing_email = db_manager.get_user_by_email(request.email)
if existing_email:
logger.warning(f"{request.username}】注册失败: 邮箱已被注册")
return RegisterResponse(
success=False,
message="该邮箱已被注册"
)
# 创建用户
if db_manager.create_user(request.username, request.email, request.password):
logger.info(f"{request.username}】注册成功")
return RegisterResponse(
success=True,
message="注册成功,请登录"
)
else:
logger.error(f"{request.username}】注册失败: 数据库操作失败")
return RegisterResponse(
success=False,
message="注册失败,请稍后重试"
)
except Exception as e:
logger.error(f"{request.username}】注册异常: {e}")
return RegisterResponse(
success=False,
message="注册失败,请稍后重试"
)
@app.post("/xianyu/reply", response_model=ResponseModel)
async def xianyu_reply(req: RequestModel):
msg_template = match_reply(req.cookie_id, req.send_message)
if not msg_template:
# 从数据库获取默认回复
from db_manager import db_manager
default_reply_settings = db_manager.get_default_reply(req.cookie_id)
if default_reply_settings and default_reply_settings.get('enabled', False):
msg_template = default_reply_settings.get('reply_content', '')
# 如果数据库中没有设置或为空,返回错误
if not msg_template:
raise HTTPException(status_code=404, detail="未找到匹配的回复规则且未设置默认回复")
# 按占位符格式化
try:
send_msg = msg_template.format(
send_user_id=req.send_user_id,
send_user_name=req.send_user_name,
send_message=req.send_message,
)
except Exception:
# 如果格式化失败,返回原始内容
send_msg = msg_template
return {"code": 200, "data": {"send_msg": send_msg}}
# ------------------------- 账号 / 关键字管理接口 -------------------------
class CookieIn(BaseModel):
id: str
value: str
class CookieStatusIn(BaseModel):
enabled: bool
class DefaultReplyIn(BaseModel):
enabled: bool
reply_content: Optional[str] = None
class NotificationChannelIn(BaseModel):
name: str
type: str = "qq"
config: str
class NotificationChannelUpdate(BaseModel):
name: str
config: str
enabled: bool = True
class MessageNotificationIn(BaseModel):
channel_id: int
enabled: bool = True
class SystemSettingIn(BaseModel):
key: str
value: str
description: Optional[str] = None
@app.get("/cookies")
def list_cookies(current_user: Dict[str, Any] = Depends(get_current_user)):
if cookie_manager.manager is None:
return []
# 获取当前用户的cookies
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
return list(user_cookies.keys())
@app.get("/cookies/details")
def get_cookies_details(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取所有Cookie的详细信息包括值和状态"""
if cookie_manager.manager is None:
return []
# 获取当前用户的cookies
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
result = []
for cookie_id, cookie_value in user_cookies.items():
cookie_enabled = cookie_manager.manager.get_cookie_status(cookie_id)
auto_confirm = db_manager.get_auto_confirm(cookie_id)
result.append({
'id': cookie_id,
'value': cookie_value,
'enabled': cookie_enabled,
'auto_confirm': auto_confirm
})
return result
@app.post("/cookies")
def add_cookie(item: CookieIn, current_user: Dict[str, Any] = Depends(get_current_user)):
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
try:
# 添加cookie时绑定到当前用户
user_id = current_user['user_id']
from db_manager import db_manager
log_with_user('info', f"尝试添加Cookie: {item.id}, 当前用户ID: {user_id}, 用户名: {current_user.get('username', 'unknown')}", current_user)
# 检查cookie是否已存在且属于其他用户
existing_cookies = db_manager.get_all_cookies()
if item.id in existing_cookies:
# 检查是否属于当前用户
user_cookies = db_manager.get_all_cookies(user_id)
if item.id not in user_cookies:
log_with_user('warning', f"Cookie ID冲突: {item.id} 已被其他用户使用", current_user)
raise HTTPException(status_code=400, detail="该Cookie ID已被其他用户使用")
# 保存到数据库时指定用户ID
db_manager.save_cookie(item.id, item.value, user_id)
# 添加到CookieManager同时指定用户ID
cookie_manager.manager.add_cookie(item.id, item.value, user_id=user_id)
log_with_user('info', f"Cookie添加成功: {item.id}", current_user)
return {"msg": "success"}
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"添加Cookie失败: {item.id} - {str(e)}", current_user)
raise HTTPException(status_code=400, detail=str(e))
@app.put('/cookies/{cid}')
def update_cookie(cid: str, item: CookieIn, current_user: Dict[str, Any] = Depends(get_current_user)):
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail='CookieManager 未就绪')
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
# 更新cookie时保持用户绑定
db_manager.save_cookie(cid, item.value, user_id)
cookie_manager.manager.update_cookie(cid, item.value)
return {'msg': 'updated'}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.put('/cookies/{cid}/status')
def update_cookie_status(cid: str, status_data: CookieStatusIn, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新账号的启用/禁用状态"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail='CookieManager 未就绪')
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
cookie_manager.manager.update_cookie_status(cid, status_data.enabled)
return {'msg': 'status updated', 'enabled': status_data.enabled}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ------------------------- 默认回复管理接口 -------------------------
@app.get('/default-replies/{cid}')
def get_default_reply(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取指定账号的默认回复设置"""
from db_manager import db_manager
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
result = db_manager.get_default_reply(cid)
if result is None:
# 如果没有设置,返回默认值
return {'enabled': False, 'reply_content': ''}
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put('/default-replies/{cid}')
def update_default_reply(cid: str, reply_data: DefaultReplyIn, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新指定账号的默认回复设置"""
from db_manager import db_manager
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
db_manager.save_default_reply(cid, reply_data.enabled, reply_data.reply_content)
return {'msg': 'default reply updated', 'enabled': reply_data.enabled}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get('/default-replies')
def get_all_default_replies(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取当前用户所有账号的默认回复设置"""
from db_manager import db_manager
try:
# 只返回当前用户的默认回复设置
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
all_replies = db_manager.get_all_default_replies()
# 过滤只属于当前用户的回复设置
user_replies = {cid: reply for cid, reply in all_replies.items() if cid in user_cookies}
return user_replies
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete('/default-replies/{cid}')
def delete_default_reply(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""删除指定账号的默认回复设置"""
from db_manager import db_manager
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
success = db_manager.delete_default_reply(cid)
if success:
return {'msg': 'default reply deleted'}
else:
raise HTTPException(status_code=400, detail='删除失败')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ------------------------- 通知渠道管理接口 -------------------------
@app.get('/notification-channels')
def get_notification_channels(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取所有通知渠道"""
from db_manager import db_manager
try:
user_id = current_user['user_id']
return db_manager.get_notification_channels(user_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post('/notification-channels')
def create_notification_channel(channel_data: NotificationChannelIn, current_user: Dict[str, Any] = Depends(get_current_user)):
"""创建通知渠道"""
from db_manager import db_manager
try:
user_id = current_user['user_id']
channel_id = db_manager.create_notification_channel(
channel_data.name,
channel_data.type,
channel_data.config,
user_id
)
return {'msg': 'notification channel created', 'id': channel_id}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get('/notification-channels/{channel_id}')
def get_notification_channel(channel_id: int, _: None = Depends(require_auth)):
"""获取指定通知渠道"""
from db_manager import db_manager
try:
channel = db_manager.get_notification_channel(channel_id)
if not channel:
raise HTTPException(status_code=404, detail='通知渠道不存在')
return channel
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put('/notification-channels/{channel_id}')
def update_notification_channel(channel_id: int, channel_data: NotificationChannelUpdate, _: None = Depends(require_auth)):
"""更新通知渠道"""
from db_manager import db_manager
try:
success = db_manager.update_notification_channel(
channel_id,
channel_data.name,
channel_data.config,
channel_data.enabled
)
if success:
return {'msg': 'notification channel updated'}
else:
raise HTTPException(status_code=404, detail='通知渠道不存在')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.delete('/notification-channels/{channel_id}')
def delete_notification_channel(channel_id: int, _: None = Depends(require_auth)):
"""删除通知渠道"""
from db_manager import db_manager
try:
success = db_manager.delete_notification_channel(channel_id)
if success:
return {'msg': 'notification channel deleted'}
else:
raise HTTPException(status_code=404, detail='通知渠道不存在')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ------------------------- 消息通知配置接口 -------------------------
@app.get('/message-notifications')
def get_all_message_notifications(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取当前用户所有账号的消息通知配置"""
from db_manager import db_manager
try:
# 只返回当前用户的消息通知配置
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
all_notifications = db_manager.get_all_message_notifications()
# 过滤只属于当前用户的通知配置
user_notifications = {cid: notifications for cid, notifications in all_notifications.items() if cid in user_cookies}
return user_notifications
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get('/message-notifications/{cid}')
def get_account_notifications(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取指定账号的消息通知配置"""
from db_manager import db_manager
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
return db_manager.get_account_notifications(cid)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post('/message-notifications/{cid}')
def set_message_notification(cid: str, notification_data: MessageNotificationIn, current_user: Dict[str, Any] = Depends(get_current_user)):
"""设置账号的消息通知"""
from db_manager import db_manager
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
# 检查通知渠道是否存在
channel = db_manager.get_notification_channel(notification_data.channel_id)
if not channel:
raise HTTPException(status_code=404, detail='通知渠道不存在')
success = db_manager.set_message_notification(cid, notification_data.channel_id, notification_data.enabled)
if success:
return {'msg': 'message notification set'}
else:
raise HTTPException(status_code=400, detail='设置失败')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete('/message-notifications/account/{cid}')
def delete_account_notifications(cid: str, _: None = Depends(require_auth)):
"""删除账号的所有消息通知配置"""
from db_manager import db_manager
try:
success = db_manager.delete_account_notifications(cid)
if success:
return {'msg': 'account notifications deleted'}
else:
raise HTTPException(status_code=404, detail='账号通知配置不存在')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete('/message-notifications/{notification_id}')
def delete_message_notification(notification_id: int, _: None = Depends(require_auth)):
"""删除消息通知配置"""
from db_manager import db_manager
try:
success = db_manager.delete_message_notification(notification_id)
if success:
return {'msg': 'message notification deleted'}
else:
raise HTTPException(status_code=404, detail='通知配置不存在')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ------------------------- 系统设置接口 -------------------------
@app.get('/system-settings')
def get_system_settings(_: None = Depends(require_auth)):
"""获取系统设置(排除敏感信息)"""
from db_manager import db_manager
try:
settings = db_manager.get_all_system_settings()
# 移除敏感信息
if 'admin_password_hash' in settings:
del settings['admin_password_hash']
return settings
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put('/system-settings/{key}')
def update_system_setting(key: str, setting_data: SystemSettingIn, _: None = Depends(require_auth)):
"""更新系统设置"""
from db_manager import db_manager
try:
# 禁止直接修改密码哈希
if key == 'admin_password_hash':
raise HTTPException(status_code=400, detail='请使用密码修改接口')
success = db_manager.set_system_setting(key, setting_data.value, setting_data.description)
if success:
return {'msg': 'system setting updated'}
else:
raise HTTPException(status_code=400, detail='更新失败')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/cookies/{cid}")
def remove_cookie(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
cookie_manager.manager.remove_cookie(cid)
return {"msg": "removed"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
class AutoConfirmUpdate(BaseModel):
auto_confirm: bool
@app.put("/cookies/{cid}/auto-confirm")
def update_auto_confirm(cid: str, update_data: AutoConfirmUpdate, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新账号的自动确认发货设置"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
# 更新数据库中的auto_confirm设置
success = db_manager.update_auto_confirm(cid, update_data.auto_confirm)
if not success:
raise HTTPException(status_code=500, detail="更新自动确认发货设置失败")
# 通知CookieManager更新设置如果账号正在运行
if hasattr(cookie_manager.manager, 'update_auto_confirm_setting'):
cookie_manager.manager.update_auto_confirm_setting(cid, update_data.auto_confirm)
return {
"msg": "success",
"auto_confirm": update_data.auto_confirm,
"message": f"自动确认发货已{'开启' if update_data.auto_confirm else '关闭'}"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cookies/{cid}/auto-confirm")
def get_auto_confirm(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取账号的自动确认发货设置"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
# 获取auto_confirm设置
auto_confirm = db_manager.get_auto_confirm(cid)
return {
"auto_confirm": auto_confirm,
"message": f"自动确认发货当前{'开启' if auto_confirm else '关闭'}"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class KeywordIn(BaseModel):
keywords: Dict[str, str] # key -> reply
class KeywordWithItemIdIn(BaseModel):
keywords: List[Dict[str, Any]] # [{"keyword": str, "reply": str, "item_id": str}]
@app.get("/keywords/{cid}")
def get_keywords(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
return cookie_manager.manager.get_keywords(cid)
@app.get("/keywords-with-item-id/{cid}")
def get_keywords_with_item_id(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取包含商品ID的关键词列表"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
# 获取包含商品ID的关键词
keywords = db_manager.get_keywords_with_item_id(cid)
# 转换为前端需要的格式
result = []
for keyword, reply, item_id in keywords:
result.append({
"keyword": keyword,
"reply": reply,
"item_id": item_id or ""
})
return result
@app.post("/keywords/{cid}")
def update_keywords(cid: str, body: KeywordIn, current_user: Dict[str, Any] = Depends(get_current_user)):
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
log_with_user('warning', f"尝试操作其他用户的Cookie关键字: {cid}", current_user)
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
kw_list = [(k, v) for k, v in body.keywords.items()]
log_with_user('info', f"更新Cookie关键字: {cid}, 数量: {len(kw_list)}", current_user)
cookie_manager.manager.update_keywords(cid, kw_list)
log_with_user('info', f"Cookie关键字更新成功: {cid}", current_user)
return {"msg": "updated", "count": len(kw_list)}
@app.post("/keywords-with-item-id/{cid}")
def update_keywords_with_item_id(cid: str, body: KeywordWithItemIdIn, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新包含商品ID的关键词列表"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
log_with_user('warning', f"尝试操作其他用户的Cookie关键字: {cid}", current_user)
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
# 验证数据格式
keywords_to_save = []
keyword_set = set() # 用于检查当前提交的关键词中是否有重复
for kw_data in body.keywords:
keyword = kw_data.get('keyword', '').strip()
reply = kw_data.get('reply', '').strip()
item_id = kw_data.get('item_id', '').strip() or None
if not keyword or not reply:
raise HTTPException(status_code=400, detail="关键词和回复内容不能为空")
# 检查当前提交的关键词中是否有重复
keyword_key = f"{keyword}|{item_id or ''}"
if keyword_key in keyword_set:
item_id_text = f"商品ID: {item_id}" if item_id else "(通用关键词)"
raise HTTPException(status_code=400, detail=f"关键词 '{keyword}' {item_id_text} 在当前提交中重复")
keyword_set.add(keyword_key)
keywords_to_save.append((keyword, reply, item_id))
# 保存关键词
success = db_manager.save_keywords_with_item_id(cid, keywords_to_save)
if not success:
raise HTTPException(status_code=500, detail="保存关键词失败")
log_with_user('info', f"更新Cookie关键字(含商品ID): {cid}, 数量: {len(keywords_to_save)}", current_user)
return {"msg": "updated", "count": len(keywords_to_save)}
@app.get("/items/{cid}")
def get_items_list(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取指定账号的商品列表"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
try:
# 获取该账号的所有商品
with db_manager.lock:
cursor = db_manager.conn.cursor()
cursor.execute('''
SELECT item_id, item_title, item_price, created_at
FROM item_info
WHERE cookie_id = ?
ORDER BY created_at DESC
''', (cid,))
items = []
for row in cursor.fetchall():
items.append({
'item_id': row[0],
'item_title': row[1] or '未知商品',
'item_price': row[2] or '价格未知',
'created_at': row[3]
})
return {"items": items, "count": len(items)}
except Exception as e:
logger.error(f"获取商品列表失败: {e}")
raise HTTPException(status_code=500, detail="获取商品列表失败")
@app.get("/keywords-export/{cid}")
def export_keywords(cid: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""导出指定账号的关键词为Excel文件"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
try:
# 获取关键词数据
keywords = db_manager.get_keywords_with_item_id(cid)
# 创建DataFrame
data = []
for keyword, reply, item_id in keywords:
data.append({
'关键词': keyword,
'商品ID': item_id or '',
'关键词内容': reply
})
# 如果没有数据创建空的DataFrame但保留列名作为模板
if not data:
df = pd.DataFrame(columns=['关键词', '商品ID', '关键词内容'])
else:
df = pd.DataFrame(data)
# 创建Excel文件
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='关键词数据', index=False)
# 如果是空模板,添加一些示例说明
if data == []:
worksheet = writer.sheets['关键词数据']
# 添加示例数据作为注释从第2行开始
worksheet['A2'] = '你好'
worksheet['B2'] = ''
worksheet['C2'] = '您好!欢迎咨询,有什么可以帮助您的吗?'
worksheet['A3'] = '价格'
worksheet['B3'] = '123456'
worksheet['C3'] = '这个商品的价格是99元现在有优惠活动哦'
worksheet['A4'] = '发货'
worksheet['B4'] = ''
worksheet['C4'] = '我们会在24小时内发货请耐心等待。'
# 设置示例行的样式(浅灰色背景)
from openpyxl.styles import PatternFill
gray_fill = PatternFill(start_color='F0F0F0', end_color='F0F0F0', fill_type='solid')
for row in range(2, 5):
for col in range(1, 4):
worksheet.cell(row=row, column=col).fill = gray_fill
output.seek(0)
# 生成文件名使用URL编码处理中文
from urllib.parse import quote
if not data:
filename = f"keywords_template_{cid}_{int(time.time())}.xlsx"
else:
filename = f"keywords_{cid}_{int(time.time())}.xlsx"
encoded_filename = quote(filename.encode('utf-8'))
# 返回文件
return StreamingResponse(
io.BytesIO(output.read()),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
)
except Exception as e:
logger.error(f"导出关键词失败: {e}")
raise HTTPException(status_code=500, detail=f"导出关键词失败: {str(e)}")
@app.post("/keywords-import/{cid}")
async def import_keywords(cid: str, file: UploadFile = File(...), current_user: Dict[str, Any] = Depends(get_current_user)):
"""导入Excel文件中的关键词到指定账号"""
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail="CookieManager 未就绪")
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cid not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
# 检查文件类型
if not file.filename.endswith(('.xlsx', '.xls')):
raise HTTPException(status_code=400, detail="请上传Excel文件(.xlsx或.xls)")
try:
# 读取Excel文件
contents = await file.read()
df = pd.read_excel(io.BytesIO(contents))
# 检查必要的列
required_columns = ['关键词', '商品ID', '关键词内容']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise HTTPException(status_code=400, detail=f"Excel文件缺少必要的列: {', '.join(missing_columns)}")
# 获取现有关键词
existing_keywords = db_manager.get_keywords_with_item_id(cid)
existing_dict = {}
for keyword, reply, item_id in existing_keywords:
key = f"{keyword}|{item_id or ''}"
existing_dict[key] = (keyword, reply, item_id)
# 处理导入数据
import_data = []
update_count = 0
add_count = 0
for index, row in df.iterrows():
keyword = str(row['关键词']).strip()
item_id = str(row['商品ID']).strip() if pd.notna(row['商品ID']) and str(row['商品ID']).strip() else None
reply = str(row['关键词内容']).strip()
if not keyword or not reply:
continue # 跳过空行
# 检查是否重复
key = f"{keyword}|{item_id or ''}"
if key in existing_dict:
# 更新现有关键词
update_count += 1
else:
# 新增关键词
add_count += 1
import_data.append((keyword, reply, item_id))
if not import_data:
raise HTTPException(status_code=400, detail="Excel文件中没有有效的关键词数据")
# 保存到数据库
success = db_manager.save_keywords_with_item_id(cid, import_data)
if not success:
raise HTTPException(status_code=500, detail="保存关键词到数据库失败")
log_with_user('info', f"导入关键词成功: {cid}, 新增: {add_count}, 更新: {update_count}", current_user)
return {
"msg": "导入成功",
"total": len(import_data),
"added": add_count,
"updated": update_count
}
except pd.errors.EmptyDataError:
raise HTTPException(status_code=400, detail="Excel文件为空")
except pd.errors.ParserError:
raise HTTPException(status_code=400, detail="Excel文件格式错误")
except Exception as e:
logger.error(f"导入关键词失败: {e}")
raise HTTPException(status_code=500, detail=f"导入关键词失败: {str(e)}")
# 卡券管理API
@app.get("/cards")
def get_cards(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取当前用户的卡券列表"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
cards = db_manager.get_all_cards(user_id)
return cards
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/cards")
def create_card(card_data: dict, current_user: Dict[str, Any] = Depends(get_current_user)):
"""创建新卡券"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
card_name = card_data.get('name', '未命名卡券')
log_with_user('info', f"创建卡券: {card_name}", current_user)
# 验证多规格字段
is_multi_spec = card_data.get('is_multi_spec', False)
if is_multi_spec:
if not card_data.get('spec_name') or not card_data.get('spec_value'):
raise HTTPException(status_code=400, detail="多规格卡券必须提供规格名称和规格值")
card_id = db_manager.create_card(
name=card_data.get('name'),
card_type=card_data.get('type'),
api_config=card_data.get('api_config'),
text_content=card_data.get('text_content'),
data_content=card_data.get('data_content'),
description=card_data.get('description'),
enabled=card_data.get('enabled', True),
delay_seconds=card_data.get('delay_seconds', 0),
is_multi_spec=is_multi_spec,
spec_name=card_data.get('spec_name') if is_multi_spec else None,
spec_value=card_data.get('spec_value') if is_multi_spec else None,
user_id=user_id
)
log_with_user('info', f"卡券创建成功: {card_name} (ID: {card_id})", current_user)
return {"id": card_id, "message": "卡券创建成功"}
except Exception as e:
log_with_user('error', f"创建卡券失败: {card_data.get('name', '未知')} - {str(e)}", current_user)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cards/{card_id}")
def get_card(card_id: int, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取单个卡券详情"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
card = db_manager.get_card_by_id(card_id, user_id)
if card:
return card
else:
raise HTTPException(status_code=404, detail="卡券不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/cards/{card_id}")
def update_card(card_id: int, card_data: dict, _: None = Depends(require_auth)):
"""更新卡券"""
try:
from db_manager import db_manager
# 验证多规格字段
is_multi_spec = card_data.get('is_multi_spec')
if is_multi_spec:
if not card_data.get('spec_name') or not card_data.get('spec_value'):
raise HTTPException(status_code=400, detail="多规格卡券必须提供规格名称和规格值")
success = db_manager.update_card(
card_id=card_id,
name=card_data.get('name'),
card_type=card_data.get('type'),
api_config=card_data.get('api_config'),
text_content=card_data.get('text_content'),
data_content=card_data.get('data_content'),
description=card_data.get('description'),
enabled=card_data.get('enabled', True),
delay_seconds=card_data.get('delay_seconds'),
is_multi_spec=is_multi_spec,
spec_name=card_data.get('spec_name'),
spec_value=card_data.get('spec_value')
)
if success:
return {"message": "卡券更新成功"}
else:
raise HTTPException(status_code=404, detail="卡券不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 自动发货规则API
@app.get("/delivery-rules")
def get_delivery_rules(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取发货规则列表"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
rules = db_manager.get_all_delivery_rules(user_id)
return rules
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/delivery-rules")
def create_delivery_rule(rule_data: dict, current_user: Dict[str, Any] = Depends(get_current_user)):
"""创建新发货规则"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
rule_id = db_manager.create_delivery_rule(
keyword=rule_data.get('keyword'),
card_id=rule_data.get('card_id'),
delivery_count=rule_data.get('delivery_count', 1),
enabled=rule_data.get('enabled', True),
description=rule_data.get('description'),
user_id=user_id
)
return {"id": rule_id, "message": "发货规则创建成功"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/delivery-rules/{rule_id}")
def get_delivery_rule(rule_id: int, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取单个发货规则详情"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
rule = db_manager.get_delivery_rule_by_id(rule_id, user_id)
if rule:
return rule
else:
raise HTTPException(status_code=404, detail="发货规则不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/delivery-rules/{rule_id}")
def update_delivery_rule(rule_id: int, rule_data: dict, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新发货规则"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
success = db_manager.update_delivery_rule(
rule_id=rule_id,
keyword=rule_data.get('keyword'),
card_id=rule_data.get('card_id'),
delivery_count=rule_data.get('delivery_count', 1),
enabled=rule_data.get('enabled', True),
description=rule_data.get('description'),
user_id=user_id
)
if success:
return {"message": "发货规则更新成功"}
else:
raise HTTPException(status_code=404, detail="发货规则不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/cards/{card_id}")
def delete_card(card_id: int, _: None = Depends(require_auth)):
"""删除卡券"""
try:
from db_manager import db_manager
success = db_manager.delete_card(card_id)
if success:
return {"message": "卡券删除成功"}
else:
raise HTTPException(status_code=404, detail="卡券不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delivery-rules/{rule_id}")
def delete_delivery_rule(rule_id: int, current_user: Dict[str, Any] = Depends(get_current_user)):
"""删除发货规则"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
success = db_manager.delete_delivery_rule(rule_id, user_id)
if success:
return {"message": "发货规则删除成功"}
else:
raise HTTPException(status_code=404, detail="发货规则不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ==================== 备份和恢复 API ====================
@app.get("/backup/export")
def export_backup(current_user: Dict[str, Any] = Depends(get_current_user)):
"""导出用户备份"""
try:
from db_manager import db_manager
user_id = current_user['user_id']
username = current_user['username']
# 导出当前用户的数据
backup_data = db_manager.export_backup(user_id)
# 生成文件名
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"xianyu_backup_{username}_{timestamp}.json"
# 返回JSON响应设置下载头
response = JSONResponse(content=backup_data)
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/json"
return response
except Exception as e:
raise HTTPException(status_code=500, detail=f"导出备份失败: {str(e)}")
@app.post("/backup/import")
def import_backup(file: UploadFile = File(...), current_user: Dict[str, Any] = Depends(get_current_user)):
"""导入用户备份"""
try:
# 验证文件类型
if not file.filename.endswith('.json'):
raise HTTPException(status_code=400, detail="只支持JSON格式的备份文件")
# 读取文件内容
content = file.file.read()
backup_data = json.loads(content.decode('utf-8'))
# 导入备份到当前用户
from db_manager import db_manager
user_id = current_user['user_id']
success = db_manager.import_backup(backup_data, user_id)
if success:
# 备份导入成功后,刷新 CookieManager 的内存缓存
import cookie_manager
if cookie_manager.manager:
try:
cookie_manager.manager.reload_from_db()
logger.info("备份导入后已刷新 CookieManager 缓存")
except Exception as e:
logger.error(f"刷新 CookieManager 缓存失败: {e}")
return {"message": "备份导入成功"}
else:
raise HTTPException(status_code=400, detail="备份导入失败")
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="备份文件格式无效")
except Exception as e:
raise HTTPException(status_code=500, detail=f"导入备份失败: {str(e)}")
@app.post("/system/reload-cache")
def reload_cache(_: None = Depends(require_auth)):
"""重新加载系统缓存(用于手动刷新数据)"""
try:
import cookie_manager
if cookie_manager.manager:
success = cookie_manager.manager.reload_from_db()
if success:
return {"message": "系统缓存已刷新", "success": True}
else:
raise HTTPException(status_code=500, detail="缓存刷新失败")
else:
raise HTTPException(status_code=500, detail="CookieManager 未初始化")
except Exception as e:
raise HTTPException(status_code=500, detail=f"刷新缓存失败: {str(e)}")
# ==================== 商品管理 API ====================
@app.get("/items")
def get_all_items(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取当前用户的所有商品信息"""
try:
# 只返回当前用户的商品信息
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
all_items = []
for cookie_id in user_cookies.keys():
items = db_manager.get_items_by_cookie(cookie_id)
all_items.extend(items)
return {"items": all_items}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取商品信息失败: {str(e)}")
# ==================== 商品搜索 API ====================
class ItemSearchRequest(BaseModel):
keyword: str
page: int = 1
page_size: int = 20
class ItemSearchMultipleRequest(BaseModel):
keyword: str
total_pages: int = 1
@app.post("/items/search")
async def search_items(
search_request: ItemSearchRequest,
current_user: Optional[Dict[str, Any]] = Depends(get_current_user_optional)
):
"""搜索闲鱼商品"""
try:
from utils.item_search import search_xianyu_items
# 执行搜索
result = await search_xianyu_items(
keyword=search_request.keyword,
page=search_request.page,
page_size=search_request.page_size
)
return {
"success": True,
"data": result.get("items", []),
"total": result.get("total", 0),
"page": search_request.page,
"page_size": search_request.page_size,
"keyword": search_request.keyword
}
except Exception as e:
logger.error(f"商品搜索失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"商品搜索失败: {str(e)}")
@app.post("/items/search_multiple")
async def search_multiple_pages(
search_request: ItemSearchMultipleRequest,
current_user: Optional[Dict[str, Any]] = Depends(get_current_user_optional)
):
"""搜索多页闲鱼商品"""
try:
from utils.item_search import search_multiple_pages_xianyu
# 执行多页搜索
result = await search_multiple_pages_xianyu(
keyword=search_request.keyword,
total_pages=search_request.total_pages
)
return {
"success": True,
"data": result.get("items", []),
"total": result.get("total", 0),
"total_pages": search_request.total_pages,
"keyword": search_request.keyword,
"is_real_data": result.get("is_real_data", False),
"is_fallback": result.get("is_fallback", False),
"source": result.get("source", "unknown")
}
except Exception as e:
logger.error(f"多页商品搜索失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"多页商品搜索失败: {str(e)}")
@app.get("/items/detail/{item_id}")
async def get_public_item_detail(
item_id: str,
current_user: Optional[Dict[str, Any]] = Depends(get_current_user_optional)
):
"""获取公开商品详情通过外部API"""
try:
from utils.item_search import get_item_detail_from_api
# 从外部API获取商品详情
detail = await get_item_detail_from_api(item_id)
if detail:
return {
"success": True,
"data": detail
}
else:
raise HTTPException(status_code=404, detail="商品详情获取失败")
except HTTPException:
raise
except Exception as e:
logger.error(f"获取商品详情失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取商品详情失败: {str(e)}")
@app.get("/items/cookie/{cookie_id}")
def get_items_by_cookie(cookie_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取指定Cookie的商品信息"""
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cookie_id not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
items = db_manager.get_items_by_cookie(cookie_id)
return {"items": items}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取商品信息失败: {str(e)}")
@app.get("/items/{cookie_id}/{item_id}")
def get_item_detail(cookie_id: str, item_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取商品详情"""
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cookie_id not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
item = db_manager.get_item_info(cookie_id, item_id)
if not item:
raise HTTPException(status_code=404, detail="商品不存在")
return {"item": item}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取商品详情失败: {str(e)}")
class ItemDetailUpdate(BaseModel):
item_detail: str
@app.put("/items/{cookie_id}/{item_id}")
def update_item_detail(
cookie_id: str,
item_id: str,
update_data: ItemDetailUpdate,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""更新商品详情"""
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cookie_id not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
success = db_manager.update_item_detail(cookie_id, item_id, update_data.item_detail)
if success:
return {"message": "商品详情更新成功"}
else:
raise HTTPException(status_code=400, detail="更新失败")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"更新商品详情失败: {str(e)}")
@app.delete("/items/{cookie_id}/{item_id}")
def delete_item_info(
cookie_id: str,
item_id: str,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""删除商品信息"""
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cookie_id not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
success = db_manager.delete_item_info(cookie_id, item_id)
if success:
return {"message": "商品信息删除成功"}
else:
raise HTTPException(status_code=404, detail="商品信息不存在")
except HTTPException:
raise
except Exception as e:
logger.error(f"删除商品信息异常: {e}")
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
class BatchDeleteRequest(BaseModel):
items: List[dict] # [{"cookie_id": "xxx", "item_id": "yyy"}, ...]
class AIReplySettings(BaseModel):
ai_enabled: bool
model_name: str = "qwen-plus"
api_key: str = ""
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
max_discount_percent: int = 10
max_discount_amount: int = 100
max_bargain_rounds: int = 3
custom_prompts: str = ""
@app.delete("/items/batch")
def batch_delete_items(
request: BatchDeleteRequest,
_: None = Depends(require_auth)
):
"""批量删除商品信息"""
try:
if not request.items:
raise HTTPException(status_code=400, detail="删除列表不能为空")
success_count = db_manager.batch_delete_item_info(request.items)
total_count = len(request.items)
return {
"message": f"批量删除完成",
"success_count": success_count,
"total_count": total_count,
"failed_count": total_count - success_count
}
except Exception as e:
logger.error(f"批量删除商品信息异常: {e}")
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
# ==================== AI回复管理API ====================
@app.get("/ai-reply-settings/{cookie_id}")
def get_ai_reply_settings(cookie_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取指定账号的AI回复设置"""
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cookie_id not in user_cookies:
raise HTTPException(status_code=403, detail="无权限访问该Cookie")
settings = db_manager.get_ai_reply_settings(cookie_id)
return settings
except HTTPException:
raise
except Exception as e:
logger.error(f"获取AI回复设置异常: {e}")
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.put("/ai-reply-settings/{cookie_id}")
def update_ai_reply_settings(cookie_id: str, settings: AIReplySettings, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新指定账号的AI回复设置"""
try:
# 检查cookie是否属于当前用户
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
if cookie_id not in user_cookies:
raise HTTPException(status_code=403, detail="无权限操作该Cookie")
# 检查账号是否存在
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail='CookieManager 未就绪')
# 保存设置
settings_dict = settings.dict()
success = db_manager.save_ai_reply_settings(cookie_id, settings_dict)
if success:
# 清理客户端缓存,强制重新创建
ai_reply_engine.clear_client_cache(cookie_id)
# 如果启用了AI回复记录日志
if settings.ai_enabled:
logger.info(f"账号 {cookie_id} 启用AI回复")
else:
logger.info(f"账号 {cookie_id} 禁用AI回复")
return {"message": "AI回复设置更新成功"}
else:
raise HTTPException(status_code=400, detail="更新失败")
except HTTPException:
raise
except Exception as e:
logger.error(f"更新AI回复设置异常: {e}")
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.get("/ai-reply-settings")
def get_all_ai_reply_settings(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取当前用户所有账号的AI回复设置"""
try:
# 只返回当前用户的AI回复设置
user_id = current_user['user_id']
from db_manager import db_manager
user_cookies = db_manager.get_all_cookies(user_id)
all_settings = db_manager.get_all_ai_reply_settings()
# 过滤只属于当前用户的设置
user_settings = {cid: settings for cid, settings in all_settings.items() if cid in user_cookies}
return user_settings
except Exception as e:
logger.error(f"获取所有AI回复设置异常: {e}")
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.post("/ai-reply-test/{cookie_id}")
def test_ai_reply(cookie_id: str, test_data: dict, _: None = Depends(require_auth)):
"""测试AI回复功能"""
try:
# 检查账号是否存在
if cookie_manager.manager is None:
raise HTTPException(status_code=500, detail='CookieManager 未就绪')
if cookie_id not in cookie_manager.manager.cookies:
raise HTTPException(status_code=404, detail='账号不存在')
# 检查是否启用AI回复
if not ai_reply_engine.is_ai_enabled(cookie_id):
raise HTTPException(status_code=400, detail='该账号未启用AI回复')
# 构造测试数据
test_message = test_data.get('message', '你好')
test_item_info = {
'title': test_data.get('item_title', '测试商品'),
'price': test_data.get('item_price', 100),
'desc': test_data.get('item_desc', '这是一个测试商品')
}
# 生成测试回复
reply = ai_reply_engine.generate_reply(
message=test_message,
item_info=test_item_info,
chat_id=f"test_{int(time.time())}",
cookie_id=cookie_id,
user_id="test_user",
item_id="test_item"
)
if reply:
return {"message": "测试成功", "reply": reply}
else:
raise HTTPException(status_code=400, detail="AI回复生成失败")
except HTTPException:
raise
except Exception as e:
logger.error(f"测试AI回复异常: {e}")
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
# ==================== 日志管理API ====================
@app.get("/logs")
async def get_logs(lines: int = 200, level: str = None, source: str = None, _: None = Depends(require_auth)):
"""获取实时系统日志"""
try:
# 获取文件日志收集器
collector = get_file_log_collector()
# 获取日志
logs = collector.get_logs(lines=lines, level_filter=level, source_filter=source)
return {"success": True, "logs": logs}
except Exception as e:
return {"success": False, "message": f"获取日志失败: {str(e)}", "logs": []}
@app.get("/logs/stats")
async def get_log_stats(_: None = Depends(require_auth)):
"""获取日志统计信息"""
try:
collector = get_file_log_collector()
stats = collector.get_stats()
return {"success": True, "stats": stats}
except Exception as e:
return {"success": False, "message": f"获取日志统计失败: {str(e)}", "stats": {}}
@app.post("/logs/clear")
async def clear_logs(_: None = Depends(require_auth)):
"""清空日志"""
try:
collector = get_file_log_collector()
collector.clear_logs()
return {"success": True, "message": "日志已清空"}
except Exception as e:
return {"success": False, "message": f"清空日志失败: {str(e)}"}
# ==================== 商品管理API ====================
@app.post("/items/get-all-from-account")
async def get_all_items_from_account(request: dict, _: None = Depends(require_auth)):
"""从指定账号获取所有商品信息"""
try:
cookie_id = request.get('cookie_id')
if not cookie_id:
return {"success": False, "message": "缺少cookie_id参数"}
# 获取指定账号的cookie信息
cookie_info = db_manager.get_cookie_by_id(cookie_id)
if not cookie_info:
return {"success": False, "message": "未找到指定的账号信息"}
cookies_str = cookie_info.get('cookies_str', '')
if not cookies_str:
return {"success": False, "message": "账号cookie信息为空"}
# 创建XianyuLive实例传入正确的cookie_id
from XianyuAutoAsync import XianyuLive
xianyu_instance = XianyuLive(cookies_str, cookie_id)
# 调用获取所有商品信息的方法(自动分页)
logger.info(f"开始获取账号 {cookie_id} 的所有商品信息")
result = await xianyu_instance.get_all_items()
# 关闭session
await xianyu_instance.close_session()
if result.get('error'):
logger.error(f"获取商品信息失败: {result['error']}")
return {"success": False, "message": result['error']}
else:
total_count = result.get('total_count', 0)
total_pages = result.get('total_pages', 1)
logger.info(f"成功获取账号 {cookie_id}{total_count} 个商品(共{total_pages}页)")
return {
"success": True,
"message": f"成功获取 {total_count} 个商品(共{total_pages}页),详细信息已打印到控制台",
"total_count": total_count,
"total_pages": total_pages
}
except Exception as e:
logger.error(f"获取账号商品信息异常: {str(e)}")
return {"success": False, "message": f"获取商品信息异常: {str(e)}"}
@app.post("/items/get-by-page")
async def get_items_by_page(request: dict, _: None = Depends(require_auth)):
"""从指定账号按页获取商品信息"""
try:
# 验证参数
cookie_id = request.get('cookie_id')
page_number = request.get('page_number', 1)
page_size = request.get('page_size', 20)
if not cookie_id:
return {"success": False, "message": "缺少cookie_id参数"}
# 验证分页参数
try:
page_number = int(page_number)
page_size = int(page_size)
except (ValueError, TypeError):
return {"success": False, "message": "页码和每页数量必须是数字"}
if page_number < 1:
return {"success": False, "message": "页码必须大于0"}
if page_size < 1 or page_size > 100:
return {"success": False, "message": "每页数量必须在1-100之间"}
# 获取账号信息
account = db_manager.get_cookie_by_id(cookie_id)
if not account:
return {"success": False, "message": "账号不存在"}
cookies_str = account['cookies_str']
if not cookies_str:
return {"success": False, "message": "账号cookies为空"}
# 创建XianyuLive实例传入正确的cookie_id
from XianyuAutoAsync import XianyuLive
xianyu_instance = XianyuLive(cookies_str, cookie_id)
# 调用获取指定页商品信息的方法
logger.info(f"开始获取账号 {cookie_id}{page_number}页商品信息(每页{page_size}条)")
result = await xianyu_instance.get_item_list_info(page_number, page_size)
# 关闭session
await xianyu_instance.close_session()
if result.get('error'):
logger.error(f"获取商品信息失败: {result['error']}")
return {"success": False, "message": result['error']}
else:
current_count = result.get('current_count', 0)
logger.info(f"成功获取账号 {cookie_id}{page_number}{current_count} 个商品")
return {
"success": True,
"message": f"成功获取第{page_number}{current_count} 个商品,详细信息已打印到控制台",
"page_number": page_number,
"page_size": page_size,
"current_count": current_count
}
except Exception as e:
logger.error(f"获取账号商品信息异常: {str(e)}")
return {"success": False, "message": f"获取商品信息异常: {str(e)}"}
# ------------------------- 用户设置接口 -------------------------
@app.get('/user-settings')
def get_user_settings(current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取当前用户的设置"""
from db_manager import db_manager
try:
user_id = current_user['user_id']
settings = db_manager.get_user_settings(user_id)
return settings
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put('/user-settings/{key}')
def update_user_setting(key: str, setting_data: dict, current_user: Dict[str, Any] = Depends(get_current_user)):
"""更新用户设置"""
from db_manager import db_manager
try:
user_id = current_user['user_id']
value = setting_data.get('value')
description = setting_data.get('description', '')
log_with_user('info', f"更新用户设置: {key} = {value}", current_user)
success = db_manager.set_user_setting(user_id, key, value, description)
if success:
log_with_user('info', f"用户设置更新成功: {key}", current_user)
return {'msg': 'setting updated', 'key': key, 'value': value}
else:
log_with_user('error', f"用户设置更新失败: {key}", current_user)
raise HTTPException(status_code=400, detail='更新失败')
except Exception as e:
log_with_user('error', f"更新用户设置异常: {key} - {str(e)}", current_user)
raise HTTPException(status_code=500, detail=str(e))
@app.get('/user-settings/{key}')
def get_user_setting(key: str, current_user: Dict[str, Any] = Depends(get_current_user)):
"""获取用户特定设置"""
from db_manager import db_manager
try:
user_id = current_user['user_id']
setting = db_manager.get_user_setting(user_id, key)
if setting:
return setting
else:
raise HTTPException(status_code=404, detail='设置不存在')
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ------------------------- 管理员专用接口 -------------------------
@app.get('/admin/users')
def get_all_users(admin_user: Dict[str, Any] = Depends(require_admin)):
"""获取所有用户信息(管理员专用)"""
from db_manager import db_manager
try:
log_with_user('info', "查询所有用户信息", admin_user)
users = db_manager.get_all_users()
# 为每个用户添加统计信息
for user in users:
user_id = user['id']
# 统计用户的Cookie数量
user_cookies = db_manager.get_all_cookies(user_id)
user['cookie_count'] = len(user_cookies)
# 统计用户的卡券数量
user_cards = db_manager.get_all_cards(user_id)
user['card_count'] = len(user_cards) if user_cards else 0
# 隐藏密码字段
if 'password_hash' in user:
del user['password_hash']
log_with_user('info', f"返回用户信息,共 {len(users)} 个用户", admin_user)
return {"users": users}
except Exception as e:
log_with_user('error', f"获取用户信息失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.delete('/admin/users/{user_id}')
def delete_user(user_id: int, admin_user: Dict[str, Any] = Depends(require_admin)):
"""删除用户(管理员专用)"""
from db_manager import db_manager
try:
# 不能删除管理员自己
if user_id == admin_user['user_id']:
log_with_user('warning', "尝试删除管理员自己", admin_user)
raise HTTPException(status_code=400, detail="不能删除管理员自己")
# 获取要删除的用户信息
user_to_delete = db_manager.get_user_by_id(user_id)
if not user_to_delete:
raise HTTPException(status_code=404, detail="用户不存在")
log_with_user('info', f"准备删除用户: {user_to_delete['username']} (ID: {user_id})", admin_user)
# 删除用户及其相关数据
success = db_manager.delete_user_and_data(user_id)
if success:
log_with_user('info', f"用户删除成功: {user_to_delete['username']} (ID: {user_id})", admin_user)
return {"message": f"用户 {user_to_delete['username']} 删除成功"}
else:
log_with_user('error', f"用户删除失败: {user_to_delete['username']} (ID: {user_id})", admin_user)
raise HTTPException(status_code=400, detail="删除失败")
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"删除用户异常: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.get('/admin/logs')
def get_system_logs(admin_user: Dict[str, Any] = Depends(require_admin),
lines: int = 100,
level: str = None):
"""获取系统日志(管理员专用)"""
import os
import glob
from datetime import datetime
try:
log_with_user('info', f"查询系统日志,行数: {lines}, 级别: {level}", admin_user)
# 查找日志文件
log_files = glob.glob("logs/xianyu_*.log")
if not log_files:
return {"logs": [], "message": "未找到日志文件"}
# 获取最新的日志文件
latest_log_file = max(log_files, key=os.path.getctime)
logs = []
try:
with open(latest_log_file, 'r', encoding='utf-8') as f:
all_lines = f.readlines()
# 如果指定了日志级别,进行过滤
if level:
filtered_lines = [line for line in all_lines if f"| {level.upper()} |" in line]
else:
filtered_lines = all_lines
# 获取最后N行
recent_lines = filtered_lines[-lines:] if len(filtered_lines) > lines else filtered_lines
for line in recent_lines:
logs.append(line.strip())
except Exception as e:
log_with_user('error', f"读取日志文件失败: {str(e)}", admin_user)
return {"logs": [], "message": f"读取日志文件失败: {str(e)}"}
log_with_user('info', f"返回日志记录 {len(logs)}", admin_user)
return {
"logs": logs,
"log_file": latest_log_file,
"total_lines": len(logs)
}
except Exception as e:
log_with_user('error', f"获取系统日志失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.get('/admin/stats')
def get_system_stats(admin_user: Dict[str, Any] = Depends(require_admin)):
"""获取系统统计信息(管理员专用)"""
from db_manager import db_manager
try:
log_with_user('info', "查询系统统计信息", admin_user)
stats = {
"users": {
"total": 0,
"active_today": 0
},
"cookies": {
"total": 0,
"enabled": 0
},
"cards": {
"total": 0,
"enabled": 0
},
"system": {
"uptime": "未知",
"version": "1.0.0"
}
}
# 用户统计
all_users = db_manager.get_all_users()
stats["users"]["total"] = len(all_users)
# Cookie统计
all_cookies = db_manager.get_all_cookies()
stats["cookies"]["total"] = len(all_cookies)
# 卡券统计
all_cards = db_manager.get_all_cards()
if all_cards:
stats["cards"]["total"] = len(all_cards)
stats["cards"]["enabled"] = len([card for card in all_cards if card.get('enabled', True)])
log_with_user('info', "系统统计信息查询完成", admin_user)
return stats
except Exception as e:
log_with_user('error', f"获取系统统计信息失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
# ------------------------- 数据库备份和恢复接口 -------------------------
@app.get('/admin/backup/download')
def download_database_backup(admin_user: Dict[str, Any] = Depends(require_admin)):
"""下载数据库备份文件(管理员专用)"""
import os
from fastapi.responses import FileResponse
from datetime import datetime
try:
log_with_user('info', "请求下载数据库备份", admin_user)
# 使用db_manager的实际数据库路径
from db_manager import db_manager
db_file_path = db_manager.db_path
# 检查数据库文件是否存在
if not os.path.exists(db_file_path):
log_with_user('error', f"数据库文件不存在: {db_file_path}", admin_user)
raise HTTPException(status_code=404, detail="数据库文件不存在")
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
download_filename = f"xianyu_backup_{timestamp}.db"
log_with_user('info', f"开始下载数据库备份: {download_filename}", admin_user)
return FileResponse(
path=db_file_path,
filename=download_filename,
media_type='application/octet-stream'
)
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"下载数据库备份失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.post('/admin/backup/upload')
async def upload_database_backup(admin_user: Dict[str, Any] = Depends(require_admin),
backup_file: UploadFile = File(...)):
"""上传并恢复数据库备份文件(管理员专用)"""
import os
import shutil
import sqlite3
from datetime import datetime
try:
log_with_user('info', f"开始上传数据库备份: {backup_file.filename}", admin_user)
# 验证文件类型
if not backup_file.filename.endswith('.db'):
log_with_user('warning', f"无效的备份文件类型: {backup_file.filename}", admin_user)
raise HTTPException(status_code=400, detail="只支持.db格式的数据库文件")
# 验证文件大小限制100MB
content = await backup_file.read()
if len(content) > 100 * 1024 * 1024: # 100MB
log_with_user('warning', f"备份文件过大: {len(content)} bytes", admin_user)
raise HTTPException(status_code=400, detail="备份文件大小不能超过100MB")
# 验证是否为有效的SQLite数据库文件
temp_file_path = f"temp_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
try:
# 保存临时文件
with open(temp_file_path, 'wb') as temp_file:
temp_file.write(content)
# 验证数据库文件完整性
conn = sqlite3.connect(temp_file_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
conn.close()
# 检查是否包含必要的表
table_names = [table[0] for table in tables]
required_tables = ['users', 'cookies'] # 最基本的表
missing_tables = [table for table in required_tables if table not in table_names]
if missing_tables:
log_with_user('warning', f"备份文件缺少必要的表: {missing_tables}", admin_user)
raise HTTPException(status_code=400, detail=f"备份文件不完整,缺少表: {', '.join(missing_tables)}")
log_with_user('info', f"备份文件验证通过,包含 {len(table_names)} 个表", admin_user)
except sqlite3.Error as e:
log_with_user('error', f"备份文件验证失败: {str(e)}", admin_user)
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
raise HTTPException(status_code=400, detail="无效的数据库文件")
# 备份当前数据库
from db_manager import db_manager
current_db_path = db_manager.db_path
# 生成备份文件路径(与原数据库在同一目录)
db_dir = os.path.dirname(current_db_path)
backup_filename = f"xianyu_data_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
backup_current_path = os.path.join(db_dir, backup_filename)
if os.path.exists(current_db_path):
shutil.copy2(current_db_path, backup_current_path)
log_with_user('info', f"当前数据库已备份为: {backup_current_path}", admin_user)
# 关闭当前数据库连接
if hasattr(db_manager, 'conn') and db_manager.conn:
db_manager.conn.close()
log_with_user('info', "已关闭当前数据库连接", admin_user)
# 替换数据库文件
shutil.move(temp_file_path, current_db_path)
log_with_user('info', f"数据库文件已替换: {current_db_path}", admin_user)
# 重新初始化数据库连接使用原有的db_path
db_manager.__init__(db_manager.db_path)
log_with_user('info', "数据库连接已重新初始化", admin_user)
# 验证新数据库
try:
test_users = db_manager.get_all_users()
log_with_user('info', f"数据库恢复成功,包含 {len(test_users)} 个用户", admin_user)
except Exception as e:
log_with_user('error', f"数据库恢复后验证失败: {str(e)}", admin_user)
# 如果验证失败,尝试恢复原数据库
if os.path.exists(backup_current_path):
shutil.copy2(backup_current_path, current_db_path)
db_manager.__init__()
log_with_user('info', "已恢复原数据库", admin_user)
raise HTTPException(status_code=500, detail="数据库恢复失败,已回滚到原数据库")
return {
"success": True,
"message": "数据库恢复成功",
"backup_file": backup_current_path,
"user_count": len(test_users)
}
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"上传数据库备份失败: {str(e)}", admin_user)
# 清理临时文件
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
os.remove(temp_file_path)
raise HTTPException(status_code=500, detail=str(e))
@app.get('/admin/backup/list')
def list_backup_files(admin_user: Dict[str, Any] = Depends(require_admin)):
"""列出服务器上的备份文件(管理员专用)"""
import os
import glob
from datetime import datetime
try:
log_with_user('info', "查询备份文件列表", admin_user)
# 查找备份文件
backup_files = glob.glob("xianyu_data_backup_*.db")
backup_list = []
for file_path in backup_files:
try:
stat = os.stat(file_path)
backup_list.append({
'filename': os.path.basename(file_path),
'size': stat.st_size,
'size_mb': round(stat.st_size / (1024 * 1024), 2),
'created_time': datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
'modified_time': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
log_with_user('warning', f"读取备份文件信息失败: {file_path} - {str(e)}", admin_user)
# 按修改时间倒序排列
backup_list.sort(key=lambda x: x['modified_time'], reverse=True)
log_with_user('info', f"找到 {len(backup_list)} 个备份文件", admin_user)
return {
"backups": backup_list,
"total": len(backup_list)
}
except Exception as e:
log_with_user('error', f"查询备份文件列表失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
# ------------------------- 数据管理接口 -------------------------
@app.get('/admin/data/{table_name}')
def get_table_data(table_name: str, admin_user: Dict[str, Any] = Depends(require_admin)):
"""获取指定表的所有数据(管理员专用)"""
from db_manager import db_manager
try:
log_with_user('info', f"查询表数据: {table_name}", admin_user)
# 验证表名安全性
allowed_tables = [
'users', 'cookies', 'keywords', 'default_replies', 'ai_reply_settings',
'message_notifications', 'cards', 'delivery_rules', 'notification_channels',
'user_settings', 'email_verifications', 'captcha_codes'
]
if table_name not in allowed_tables:
log_with_user('warning', f"尝试访问不允许的表: {table_name}", admin_user)
raise HTTPException(status_code=400, detail="不允许访问该表")
# 获取表数据
data, columns = db_manager.get_table_data(table_name)
log_with_user('info', f"{table_name} 查询成功,共 {len(data)} 条记录", admin_user)
return {
"success": True,
"data": data,
"columns": columns,
"count": len(data)
}
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"查询表数据失败: {table_name} - {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.delete('/admin/data/{table_name}/{record_id}')
def delete_table_record(table_name: str, record_id: str, admin_user: Dict[str, Any] = Depends(require_admin)):
"""删除指定表的指定记录(管理员专用)"""
from db_manager import db_manager
try:
log_with_user('info', f"删除表记录: {table_name}.{record_id}", admin_user)
# 验证表名安全性
allowed_tables = [
'users', 'cookies', 'keywords', 'default_replies', 'ai_reply_settings',
'message_notifications', 'cards', 'delivery_rules', 'notification_channels',
'user_settings', 'email_verifications', 'captcha_codes'
]
if table_name not in allowed_tables:
log_with_user('warning', f"尝试删除不允许的表记录: {table_name}", admin_user)
raise HTTPException(status_code=400, detail="不允许操作该表")
# 特殊保护:不能删除管理员用户
if table_name == 'users' and record_id == str(admin_user['user_id']):
log_with_user('warning', "尝试删除管理员自己", admin_user)
raise HTTPException(status_code=400, detail="不能删除管理员自己")
# 删除记录
success = db_manager.delete_table_record(table_name, record_id)
if success:
log_with_user('info', f"表记录删除成功: {table_name}.{record_id}", admin_user)
return {"success": True, "message": "删除成功"}
else:
log_with_user('warning', f"表记录删除失败: {table_name}.{record_id}", admin_user)
raise HTTPException(status_code=400, detail="删除失败,记录可能不存在")
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"删除表记录异常: {table_name}.{record_id} - {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.delete('/admin/data/{table_name}')
def clear_table_data(table_name: str, admin_user: Dict[str, Any] = Depends(require_admin)):
"""清空指定表的所有数据(管理员专用)"""
from db_manager import db_manager
try:
log_with_user('info', f"清空表数据: {table_name}", admin_user)
# 验证表名安全性
allowed_tables = [
'cookies', 'keywords', 'default_replies', 'ai_reply_settings',
'message_notifications', 'cards', 'delivery_rules', 'notification_channels',
'user_settings', 'email_verifications', 'captcha_codes'
]
# 不允许清空用户表
if table_name == 'users':
log_with_user('warning', "尝试清空用户表", admin_user)
raise HTTPException(status_code=400, detail="不允许清空用户表")
if table_name not in allowed_tables:
log_with_user('warning', f"尝试清空不允许的表: {table_name}", admin_user)
raise HTTPException(status_code=400, detail="不允许清空该表")
# 清空表数据
success = db_manager.clear_table_data(table_name)
if success:
log_with_user('info', f"表数据清空成功: {table_name}", admin_user)
return {"success": True, "message": "清空成功"}
else:
log_with_user('warning', f"表数据清空失败: {table_name}", admin_user)
raise HTTPException(status_code=400, detail="清空失败")
except HTTPException:
raise
except Exception as e:
log_with_user('error', f"清空表数据异常: {table_name} - {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
# 商品多规格管理API
@app.put("/items/{cookie_id}/{item_id}/multi-spec")
def update_item_multi_spec(cookie_id: str, item_id: str, spec_data: dict, _: None = Depends(require_auth)):
"""更新商品的多规格状态"""
try:
from db_manager import db_manager
is_multi_spec = spec_data.get('is_multi_spec', False)
success = db_manager.update_item_multi_spec_status(cookie_id, item_id, is_multi_spec)
if success:
return {"message": f"商品多规格状态已{'开启' if is_multi_spec else '关闭'}"}
else:
raise HTTPException(status_code=404, detail="商品不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)