mirror of
https://github.com/zhinianboke/xianyu-auto-reply.git
synced 2025-08-01 12:07:36 +08:00
3295 lines
139 KiB
Python
3295 lines
139 KiB
Python
import sqlite3
|
||
import os
|
||
import threading
|
||
import hashlib
|
||
import time
|
||
import json
|
||
import random
|
||
import string
|
||
import aiohttp
|
||
import io
|
||
import base64
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
from typing import List, Tuple, Dict, Optional, Any
|
||
from loguru import logger
|
||
|
||
class DBManager:
|
||
"""SQLite数据库管理,持久化存储Cookie和关键字"""
|
||
|
||
def __init__(self, db_path: str = None):
|
||
"""初始化数据库连接和表结构"""
|
||
# 支持环境变量配置数据库路径
|
||
if db_path is None:
|
||
db_path = os.getenv('DB_PATH', 'xianyu_data.db')
|
||
|
||
# 确保数据目录存在并有正确权限
|
||
db_dir = os.path.dirname(db_path)
|
||
if db_dir and not os.path.exists(db_dir):
|
||
try:
|
||
os.makedirs(db_dir, mode=0o755, exist_ok=True)
|
||
logger.info(f"创建数据目录: {db_dir}")
|
||
except PermissionError as e:
|
||
logger.error(f"创建数据目录失败,权限不足: {e}")
|
||
# 尝试使用当前目录
|
||
db_path = os.path.basename(db_path)
|
||
logger.warning(f"使用当前目录作为数据库路径: {db_path}")
|
||
except Exception as e:
|
||
logger.error(f"创建数据目录失败: {e}")
|
||
raise
|
||
|
||
# 检查目录权限
|
||
if db_dir and os.path.exists(db_dir):
|
||
if not os.access(db_dir, os.W_OK):
|
||
logger.error(f"数据目录没有写权限: {db_dir}")
|
||
# 尝试使用当前目录
|
||
db_path = os.path.basename(db_path)
|
||
logger.warning(f"使用当前目录作为数据库路径: {db_path}")
|
||
|
||
self.db_path = db_path
|
||
logger.info(f"数据库路径: {self.db_path}")
|
||
self.conn = None
|
||
self.lock = threading.RLock() # 使用可重入锁保护数据库操作
|
||
|
||
# SQL日志配置 - 默认启用
|
||
self.sql_log_enabled = True # 默认启用SQL日志
|
||
self.sql_log_level = 'INFO' # 默认使用INFO级别
|
||
|
||
# 允许通过环境变量覆盖默认设置
|
||
if os.getenv('SQL_LOG_ENABLED'):
|
||
self.sql_log_enabled = os.getenv('SQL_LOG_ENABLED', 'true').lower() == 'true'
|
||
if os.getenv('SQL_LOG_LEVEL'):
|
||
self.sql_log_level = os.getenv('SQL_LOG_LEVEL', 'INFO').upper()
|
||
|
||
logger.info(f"SQL日志已启用,日志级别: {self.sql_log_level}")
|
||
|
||
self.init_db()
|
||
|
||
def init_db(self):
|
||
"""初始化数据库表结构"""
|
||
try:
|
||
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
||
cursor = self.conn.cursor()
|
||
|
||
# 创建用户表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS users (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
username TEXT UNIQUE NOT NULL,
|
||
email TEXT UNIQUE NOT NULL,
|
||
password_hash TEXT NOT NULL,
|
||
is_active BOOLEAN DEFAULT TRUE,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建邮箱验证码表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS email_verifications (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
email TEXT NOT NULL,
|
||
code TEXT NOT NULL,
|
||
expires_at TIMESTAMP NOT NULL,
|
||
used BOOLEAN DEFAULT FALSE,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建图形验证码表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS captcha_codes (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
session_id TEXT NOT NULL,
|
||
code TEXT NOT NULL,
|
||
expires_at TIMESTAMP NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建cookies表(添加user_id字段和auto_confirm字段)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS cookies (
|
||
id TEXT PRIMARY KEY,
|
||
value TEXT NOT NULL,
|
||
user_id INTEGER NOT NULL,
|
||
auto_confirm INTEGER DEFAULT 1,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建keywords表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS keywords (
|
||
cookie_id TEXT,
|
||
keyword TEXT,
|
||
reply TEXT,
|
||
item_id TEXT,
|
||
PRIMARY KEY (cookie_id, keyword),
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建cookie_status表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS cookie_status (
|
||
cookie_id TEXT PRIMARY KEY,
|
||
enabled BOOLEAN DEFAULT TRUE,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建AI回复配置表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS ai_reply_settings (
|
||
cookie_id TEXT PRIMARY KEY,
|
||
ai_enabled BOOLEAN DEFAULT FALSE,
|
||
model_name TEXT DEFAULT 'qwen-plus',
|
||
api_key TEXT,
|
||
base_url TEXT DEFAULT 'https://dashscope.aliyuncs.com/compatible-mode/v1',
|
||
max_discount_percent INTEGER DEFAULT 10,
|
||
max_discount_amount INTEGER DEFAULT 100,
|
||
max_bargain_rounds INTEGER DEFAULT 3,
|
||
custom_prompts TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建AI对话历史表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS ai_conversations (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
cookie_id TEXT NOT NULL,
|
||
chat_id TEXT NOT NULL,
|
||
user_id TEXT NOT NULL,
|
||
item_id TEXT NOT NULL,
|
||
role TEXT NOT NULL,
|
||
content TEXT NOT NULL,
|
||
intent TEXT,
|
||
bargain_count INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies (id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建AI商品信息缓存表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS ai_item_cache (
|
||
item_id TEXT PRIMARY KEY,
|
||
data TEXT NOT NULL,
|
||
price REAL,
|
||
description TEXT,
|
||
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建卡券表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS cards (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
type TEXT NOT NULL CHECK (type IN ('api', 'text', 'data')),
|
||
api_config TEXT,
|
||
text_content TEXT,
|
||
data_content TEXT,
|
||
description TEXT,
|
||
enabled BOOLEAN DEFAULT TRUE,
|
||
delay_seconds INTEGER DEFAULT 0,
|
||
is_multi_spec BOOLEAN DEFAULT FALSE,
|
||
spec_name TEXT,
|
||
spec_value TEXT,
|
||
user_id INTEGER NOT NULL DEFAULT 1,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (user_id) REFERENCES users (id)
|
||
)
|
||
''')
|
||
|
||
# 检查并添加 user_id 列(用于数据库迁移)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT user_id FROM cards LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# user_id 列不存在,需要添加
|
||
logger.info("正在为 cards 表添加 user_id 列...")
|
||
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN user_id INTEGER NOT NULL DEFAULT 1")
|
||
self._execute_sql(cursor, "CREATE INDEX IF NOT EXISTS idx_cards_user_id ON cards(user_id)")
|
||
logger.info("cards 表 user_id 列添加完成")
|
||
|
||
# 检查并添加 delay_seconds 列(用于自动发货延时功能)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT delay_seconds FROM cards LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# delay_seconds 列不存在,需要添加
|
||
logger.info("正在为 cards 表添加 delay_seconds 列...")
|
||
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN delay_seconds INTEGER DEFAULT 0")
|
||
logger.info("cards 表 delay_seconds 列添加完成")
|
||
|
||
# 检查并添加 item_id 列(用于自动回复商品ID功能)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT item_id FROM keywords LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# item_id 列不存在,需要添加
|
||
logger.info("正在为 keywords 表添加 item_id 列...")
|
||
self._execute_sql(cursor, "ALTER TABLE keywords ADD COLUMN item_id TEXT")
|
||
logger.info("keywords 表 item_id 列添加完成")
|
||
|
||
# 创建商品信息表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS item_info (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
cookie_id TEXT NOT NULL,
|
||
item_id TEXT NOT NULL,
|
||
item_title TEXT,
|
||
item_description TEXT,
|
||
item_category TEXT,
|
||
item_price TEXT,
|
||
item_detail TEXT,
|
||
is_multi_spec BOOLEAN DEFAULT FALSE,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE,
|
||
UNIQUE(cookie_id, item_id)
|
||
)
|
||
''')
|
||
|
||
# 创建自动发货规则表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS delivery_rules (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
keyword TEXT NOT NULL,
|
||
card_id INTEGER NOT NULL,
|
||
delivery_count INTEGER DEFAULT 1,
|
||
enabled BOOLEAN DEFAULT TRUE,
|
||
description TEXT,
|
||
delivery_times INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (card_id) REFERENCES cards(id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建默认回复表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS default_replies (
|
||
cookie_id TEXT PRIMARY KEY,
|
||
enabled BOOLEAN DEFAULT FALSE,
|
||
reply_content TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
|
||
)
|
||
''')
|
||
|
||
# 创建通知渠道表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS notification_channels (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
type TEXT NOT NULL CHECK (type IN ('qq')),
|
||
config TEXT NOT NULL,
|
||
enabled BOOLEAN DEFAULT TRUE,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建消息通知配置表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS message_notifications (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
cookie_id TEXT NOT NULL,
|
||
channel_id INTEGER NOT NULL,
|
||
enabled BOOLEAN DEFAULT TRUE,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE,
|
||
FOREIGN KEY (channel_id) REFERENCES notification_channels(id) ON DELETE CASCADE,
|
||
UNIQUE(cookie_id, channel_id)
|
||
)
|
||
''')
|
||
|
||
# 创建系统设置表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS system_settings (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT NOT NULL,
|
||
description TEXT,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# 创建用户设置表
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS user_settings (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
user_id INTEGER NOT NULL,
|
||
key TEXT NOT NULL,
|
||
value TEXT NOT NULL,
|
||
description TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
|
||
UNIQUE(user_id, key)
|
||
)
|
||
''')
|
||
|
||
# 插入默认系统设置(不包括管理员密码,由reply_server.py初始化)
|
||
cursor.execute('''
|
||
INSERT OR IGNORE INTO system_settings (key, value, description) VALUES
|
||
('theme_color', 'blue', '主题颜色')
|
||
''')
|
||
|
||
# 创建默认admin用户(只在首次初始化时创建)
|
||
cursor.execute('SELECT COUNT(*) FROM users WHERE username = ?', ('admin',))
|
||
admin_exists = cursor.fetchone()[0] > 0
|
||
|
||
if not admin_exists:
|
||
# 首次创建admin用户,设置默认密码
|
||
default_password_hash = hashlib.sha256("admin123".encode()).hexdigest()
|
||
cursor.execute('''
|
||
INSERT INTO users (username, email, password_hash) VALUES
|
||
('admin', 'admin@localhost', ?)
|
||
''', (default_password_hash,))
|
||
logger.info("创建默认admin用户,密码: admin123")
|
||
|
||
# 获取admin用户ID,用于历史数据绑定
|
||
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
|
||
admin_user = cursor.fetchone()
|
||
if admin_user:
|
||
admin_user_id = admin_user[0]
|
||
|
||
# 将历史cookies数据绑定到admin用户(如果user_id列不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT user_id FROM cookies LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# user_id列不存在,需要添加并更新历史数据
|
||
self._execute_sql(cursor, "ALTER TABLE cookies ADD COLUMN user_id INTEGER")
|
||
self._execute_sql(cursor, "UPDATE cookies SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
|
||
else:
|
||
# user_id列存在,更新NULL值
|
||
self._execute_sql(cursor, "UPDATE cookies SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
|
||
|
||
# 为cookies表添加auto_confirm字段(如果不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT auto_confirm FROM cookies LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# auto_confirm列不存在,需要添加并设置默认值
|
||
self._execute_sql(cursor, "ALTER TABLE cookies ADD COLUMN auto_confirm INTEGER DEFAULT 1")
|
||
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = 1 WHERE auto_confirm IS NULL")
|
||
else:
|
||
# auto_confirm列存在,更新NULL值
|
||
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = 1 WHERE auto_confirm IS NULL")
|
||
|
||
# 为delivery_rules表添加user_id字段(如果不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT user_id FROM delivery_rules LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# user_id列不存在,需要添加并更新历史数据
|
||
self._execute_sql(cursor, "ALTER TABLE delivery_rules ADD COLUMN user_id INTEGER")
|
||
self._execute_sql(cursor, "UPDATE delivery_rules SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
|
||
else:
|
||
# user_id列存在,更新NULL值
|
||
self._execute_sql(cursor, "UPDATE delivery_rules SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
|
||
|
||
# 为notification_channels表添加user_id字段(如果不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT user_id FROM notification_channels LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# user_id列不存在,需要添加并更新历史数据
|
||
self._execute_sql(cursor, "ALTER TABLE notification_channels ADD COLUMN user_id INTEGER")
|
||
self._execute_sql(cursor, "UPDATE notification_channels SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
|
||
else:
|
||
# user_id列存在,更新NULL值
|
||
self._execute_sql(cursor, "UPDATE notification_channels SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
|
||
|
||
# 为email_verifications表添加type字段(如果不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT type FROM email_verifications LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# type列不存在,需要添加并更新历史数据
|
||
self._execute_sql(cursor, "ALTER TABLE email_verifications ADD COLUMN type TEXT DEFAULT 'register'")
|
||
self._execute_sql(cursor, "UPDATE email_verifications SET type = 'register' WHERE type IS NULL")
|
||
else:
|
||
# type列存在,更新NULL值
|
||
self._execute_sql(cursor, "UPDATE email_verifications SET type = 'register' WHERE type IS NULL")
|
||
|
||
# 为cards表添加多规格字段(如果不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT is_multi_spec FROM cards LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# 多规格字段不存在,需要添加
|
||
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN is_multi_spec BOOLEAN DEFAULT FALSE")
|
||
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN spec_name TEXT")
|
||
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN spec_value TEXT")
|
||
logger.info("为cards表添加多规格字段")
|
||
|
||
# 为item_info表添加多规格字段(如果不存在)
|
||
try:
|
||
self._execute_sql(cursor, "SELECT is_multi_spec FROM item_info LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
# 多规格字段不存在,需要添加
|
||
self._execute_sql(cursor, "ALTER TABLE item_info ADD COLUMN is_multi_spec BOOLEAN DEFAULT FALSE")
|
||
logger.info("为item_info表添加多规格字段")
|
||
|
||
self.conn.commit()
|
||
logger.info(f"数据库初始化成功: {self.db_path}")
|
||
except Exception as e:
|
||
logger.error(f"数据库初始化失败: {e}")
|
||
if self.conn:
|
||
self.conn.close()
|
||
raise
|
||
|
||
def close(self):
|
||
"""关闭数据库连接"""
|
||
if self.conn:
|
||
self.conn.close()
|
||
self.conn = None
|
||
|
||
def get_connection(self):
|
||
"""获取数据库连接,如果已关闭则重新连接"""
|
||
if self.conn is None:
|
||
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
||
return self.conn
|
||
|
||
def _log_sql(self, sql: str, params: tuple = None, operation: str = "EXECUTE"):
|
||
"""记录SQL执行日志"""
|
||
if not self.sql_log_enabled:
|
||
return
|
||
|
||
# 格式化参数
|
||
params_str = ""
|
||
if params:
|
||
if isinstance(params, (list, tuple)):
|
||
if len(params) > 0:
|
||
# 限制参数长度,避免日志过长
|
||
formatted_params = []
|
||
for param in params:
|
||
if isinstance(param, str) and len(param) > 100:
|
||
formatted_params.append(f"{param[:100]}...")
|
||
else:
|
||
formatted_params.append(repr(param))
|
||
params_str = f" | 参数: [{', '.join(formatted_params)}]"
|
||
|
||
# 格式化SQL(移除多余空白)
|
||
formatted_sql = ' '.join(sql.split())
|
||
|
||
# 根据配置的日志级别输出
|
||
log_message = f"🗄️ SQL {operation}: {formatted_sql}{params_str}"
|
||
|
||
if self.sql_log_level == 'DEBUG':
|
||
logger.debug(log_message)
|
||
elif self.sql_log_level == 'INFO':
|
||
logger.info(log_message)
|
||
elif self.sql_log_level == 'WARNING':
|
||
logger.warning(log_message)
|
||
else:
|
||
logger.debug(log_message)
|
||
|
||
def _execute_sql(self, cursor, sql: str, params: tuple = None):
|
||
"""执行SQL并记录日志"""
|
||
self._log_sql(sql, params, "EXECUTE")
|
||
if params:
|
||
return cursor.execute(sql, params)
|
||
else:
|
||
return cursor.execute(sql)
|
||
|
||
def _executemany_sql(self, cursor, sql: str, params_list):
|
||
"""批量执行SQL并记录日志"""
|
||
self._log_sql(sql, f"批量执行 {len(params_list)} 条记录", "EXECUTEMANY")
|
||
return cursor.executemany(sql, params_list)
|
||
|
||
# -------------------- Cookie操作 --------------------
|
||
def save_cookie(self, cookie_id: str, cookie_value: str, user_id: int = None) -> bool:
|
||
"""保存Cookie到数据库,如存在则更新"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 如果没有提供user_id,尝试从现有记录获取,否则使用admin用户ID
|
||
if user_id is None:
|
||
self._execute_sql(cursor, "SELECT user_id FROM cookies WHERE id = ?", (cookie_id,))
|
||
existing = cursor.fetchone()
|
||
if existing:
|
||
user_id = existing[0]
|
||
else:
|
||
# 获取admin用户ID作为默认值
|
||
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
|
||
admin_user = cursor.fetchone()
|
||
user_id = admin_user[0] if admin_user else 1
|
||
|
||
self._execute_sql(cursor,
|
||
"INSERT OR REPLACE INTO cookies (id, value, user_id) VALUES (?, ?, ?)",
|
||
(cookie_id, cookie_value, user_id)
|
||
)
|
||
self.conn.commit()
|
||
logger.info(f"Cookie保存成功: {cookie_id} (用户ID: {user_id})")
|
||
|
||
# 验证保存结果
|
||
self._execute_sql(cursor, "SELECT user_id FROM cookies WHERE id = ?", (cookie_id,))
|
||
saved_user_id = cursor.fetchone()
|
||
if saved_user_id:
|
||
logger.info(f"Cookie保存验证: {cookie_id} 实际绑定到用户ID: {saved_user_id[0]}")
|
||
else:
|
||
logger.error(f"Cookie保存验证失败: {cookie_id} 未找到记录")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Cookie保存失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def delete_cookie(self, cookie_id: str) -> bool:
|
||
"""从数据库删除Cookie及其关键字"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
# 删除关联的关键字
|
||
self._execute_sql(cursor, "DELETE FROM keywords WHERE cookie_id = ?", (cookie_id,))
|
||
# 删除Cookie
|
||
self._execute_sql(cursor, "DELETE FROM cookies WHERE id = ?", (cookie_id,))
|
||
self.conn.commit()
|
||
logger.debug(f"Cookie删除成功: {cookie_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Cookie删除失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_cookie(self, cookie_id: str) -> Optional[str]:
|
||
"""获取指定Cookie值"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT value FROM cookies WHERE id = ?", (cookie_id,))
|
||
result = cursor.fetchone()
|
||
return result[0] if result else None
|
||
except Exception as e:
|
||
logger.error(f"获取Cookie失败: {e}")
|
||
return None
|
||
|
||
def get_all_cookies(self, user_id: int = None) -> Dict[str, str]:
|
||
"""获取所有Cookie(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
self._execute_sql(cursor, "SELECT id, value FROM cookies WHERE user_id = ?", (user_id,))
|
||
else:
|
||
self._execute_sql(cursor, "SELECT id, value FROM cookies")
|
||
return {row[0]: row[1] for row in cursor.fetchall()}
|
||
except Exception as e:
|
||
logger.error(f"获取所有Cookie失败: {e}")
|
||
return {}
|
||
|
||
def get_cookie_by_id(self, cookie_id: str) -> Optional[Dict[str, str]]:
|
||
"""根据ID获取Cookie信息
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
|
||
Returns:
|
||
Dict包含cookie信息,包括cookies_str字段,如果不存在返回None
|
||
"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT id, value, created_at FROM cookies WHERE id = ?", (cookie_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
return {
|
||
'id': result[0],
|
||
'cookies_str': result[1], # 使用cookies_str字段名以匹配调用方期望
|
||
'value': result[1], # 保持向后兼容
|
||
'created_at': result[2]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"根据ID获取Cookie失败: {e}")
|
||
return None
|
||
|
||
def get_cookie_details(self, cookie_id: str) -> Optional[Dict[str, any]]:
|
||
"""获取Cookie的详细信息,包括user_id和auto_confirm"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT id, value, user_id, auto_confirm, created_at FROM cookies WHERE id = ?", (cookie_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
return {
|
||
'id': result[0],
|
||
'value': result[1],
|
||
'user_id': result[2],
|
||
'auto_confirm': bool(result[3]),
|
||
'created_at': result[4]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取Cookie详细信息失败: {e}")
|
||
return None
|
||
|
||
def update_auto_confirm(self, cookie_id: str, auto_confirm: bool) -> bool:
|
||
"""更新Cookie的自动确认发货设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = ? WHERE id = ?", (int(auto_confirm), cookie_id))
|
||
self.conn.commit()
|
||
logger.info(f"更新账号 {cookie_id} 自动确认发货设置: {'开启' if auto_confirm else '关闭'}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"更新自动确认发货设置失败: {e}")
|
||
return False
|
||
|
||
def get_auto_confirm(self, cookie_id: str) -> bool:
|
||
"""获取Cookie的自动确认发货设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT auto_confirm FROM cookies WHERE id = ?", (cookie_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
return bool(result[0])
|
||
return True # 默认开启
|
||
except Exception as e:
|
||
logger.error(f"获取自动确认发货设置失败: {e}")
|
||
return True # 出错时默认开启
|
||
|
||
# -------------------- 关键字操作 --------------------
|
||
def save_keywords(self, cookie_id: str, keywords: List[Tuple[str, str]]) -> bool:
|
||
"""保存关键字列表,先删除旧数据再插入新数据(向后兼容方法)"""
|
||
# 转换为新格式(不包含item_id)
|
||
keywords_with_item_id = [(keyword, reply, None) for keyword, reply in keywords]
|
||
return self.save_keywords_with_item_id(cookie_id, keywords_with_item_id)
|
||
|
||
def save_keywords_with_item_id(self, cookie_id: str, keywords: List[Tuple[str, str, str]]) -> bool:
|
||
"""保存关键字列表(包含商品ID),先删除旧数据再插入新数据"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 先删除该cookie_id的所有关键字
|
||
self._execute_sql(cursor, "DELETE FROM keywords WHERE cookie_id = ?", (cookie_id,))
|
||
|
||
# 插入新关键字
|
||
for keyword, reply, item_id in keywords:
|
||
self._execute_sql(cursor,
|
||
"INSERT INTO keywords (cookie_id, keyword, reply, item_id) VALUES (?, ?, ?, ?)",
|
||
(cookie_id, keyword, reply, item_id))
|
||
|
||
self.conn.commit()
|
||
logger.info(f"关键字保存成功: {cookie_id}, {len(keywords)}条")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"关键字保存失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_keywords(self, cookie_id: str) -> List[Tuple[str, str]]:
|
||
"""获取指定Cookie的关键字列表(向后兼容方法)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT keyword, reply FROM keywords WHERE cookie_id = ?", (cookie_id,))
|
||
return [(row[0], row[1]) for row in cursor.fetchall()]
|
||
except Exception as e:
|
||
logger.error(f"获取关键字失败: {e}")
|
||
return []
|
||
|
||
def get_keywords_with_item_id(self, cookie_id: str) -> List[Tuple[str, str, str]]:
|
||
"""获取指定Cookie的关键字列表(包含商品ID)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT keyword, reply, item_id FROM keywords WHERE cookie_id = ?", (cookie_id,))
|
||
return [(row[0], row[1], row[2]) for row in cursor.fetchall()]
|
||
except Exception as e:
|
||
logger.error(f"获取关键字失败: {e}")
|
||
return []
|
||
|
||
def check_keyword_duplicate(self, cookie_id: str, keyword: str, item_id: str = None) -> bool:
|
||
"""检查关键词是否重复"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if item_id:
|
||
# 如果有商品ID,检查相同cookie_id、keyword、item_id的组合
|
||
self._execute_sql(cursor,
|
||
"SELECT COUNT(*) FROM keywords WHERE cookie_id = ? AND keyword = ? AND item_id = ?",
|
||
(cookie_id, keyword, item_id))
|
||
else:
|
||
# 如果没有商品ID,检查相同cookie_id、keyword且item_id为空的组合
|
||
self._execute_sql(cursor,
|
||
"SELECT COUNT(*) FROM keywords WHERE cookie_id = ? AND keyword = ? AND (item_id IS NULL OR item_id = '')",
|
||
(cookie_id, keyword))
|
||
|
||
count = cursor.fetchone()[0]
|
||
return count > 0
|
||
except Exception as e:
|
||
logger.error(f"检查关键词重复失败: {e}")
|
||
return False
|
||
|
||
|
||
|
||
def get_all_keywords(self, user_id: int = None) -> Dict[str, List[Tuple[str, str]]]:
|
||
"""获取所有Cookie的关键字(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
cursor.execute("""
|
||
SELECT k.cookie_id, k.keyword, k.reply
|
||
FROM keywords k
|
||
JOIN cookies c ON k.cookie_id = c.id
|
||
WHERE c.user_id = ?
|
||
""", (user_id,))
|
||
else:
|
||
self._execute_sql(cursor, "SELECT cookie_id, keyword, reply FROM keywords")
|
||
|
||
result = {}
|
||
for row in cursor.fetchall():
|
||
cookie_id, keyword, reply = row
|
||
if cookie_id not in result:
|
||
result[cookie_id] = []
|
||
result[cookie_id].append((keyword, reply))
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取所有关键字失败: {e}")
|
||
return {}
|
||
|
||
def save_cookie_status(self, cookie_id: str, enabled: bool):
|
||
"""保存Cookie的启用状态"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO cookie_status (cookie_id, enabled, updated_at)
|
||
VALUES (?, ?, CURRENT_TIMESTAMP)
|
||
''', (cookie_id, enabled))
|
||
self.conn.commit()
|
||
logger.debug(f"保存Cookie状态: {cookie_id} -> {'启用' if enabled else '禁用'}")
|
||
except Exception as e:
|
||
logger.error(f"保存Cookie状态失败: {e}")
|
||
raise
|
||
|
||
def get_cookie_status(self, cookie_id: str) -> bool:
|
||
"""获取Cookie的启用状态"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('SELECT enabled FROM cookie_status WHERE cookie_id = ?', (cookie_id,))
|
||
result = cursor.fetchone()
|
||
return bool(result[0]) if result else True # 默认启用
|
||
except Exception as e:
|
||
logger.error(f"获取Cookie状态失败: {e}")
|
||
return True # 出错时默认启用
|
||
|
||
def get_all_cookie_status(self) -> Dict[str, bool]:
|
||
"""获取所有Cookie的启用状态"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('SELECT cookie_id, enabled FROM cookie_status')
|
||
|
||
result = {}
|
||
for row in cursor.fetchall():
|
||
cookie_id, enabled = row
|
||
result[cookie_id] = bool(enabled)
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取所有Cookie状态失败: {e}")
|
||
return {}
|
||
|
||
# -------------------- AI回复设置操作 --------------------
|
||
def save_ai_reply_settings(self, cookie_id: str, settings: dict) -> bool:
|
||
"""保存AI回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO ai_reply_settings
|
||
(cookie_id, ai_enabled, model_name, api_key, base_url,
|
||
max_discount_percent, max_discount_amount, max_bargain_rounds,
|
||
custom_prompts, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
||
''', (
|
||
cookie_id,
|
||
settings.get('ai_enabled', False),
|
||
settings.get('model_name', 'qwen-plus'),
|
||
settings.get('api_key', ''),
|
||
settings.get('base_url', 'https://dashscope.aliyuncs.com/compatible-mode/v1'),
|
||
settings.get('max_discount_percent', 10),
|
||
settings.get('max_discount_amount', 100),
|
||
settings.get('max_bargain_rounds', 3),
|
||
settings.get('custom_prompts', '')
|
||
))
|
||
self.conn.commit()
|
||
logger.debug(f"AI回复设置保存成功: {cookie_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存AI回复设置失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_ai_reply_settings(self, cookie_id: str) -> dict:
|
||
"""获取AI回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT ai_enabled, model_name, api_key, base_url,
|
||
max_discount_percent, max_discount_amount, max_bargain_rounds,
|
||
custom_prompts
|
||
FROM ai_reply_settings WHERE cookie_id = ?
|
||
''', (cookie_id,))
|
||
|
||
result = cursor.fetchone()
|
||
if result:
|
||
return {
|
||
'ai_enabled': bool(result[0]),
|
||
'model_name': result[1],
|
||
'api_key': result[2],
|
||
'base_url': result[3],
|
||
'max_discount_percent': result[4],
|
||
'max_discount_amount': result[5],
|
||
'max_bargain_rounds': result[6],
|
||
'custom_prompts': result[7]
|
||
}
|
||
else:
|
||
# 返回默认设置
|
||
return {
|
||
'ai_enabled': False,
|
||
'model_name': 'qwen-plus',
|
||
'api_key': '',
|
||
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
|
||
'max_discount_percent': 10,
|
||
'max_discount_amount': 100,
|
||
'max_bargain_rounds': 3,
|
||
'custom_prompts': ''
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取AI回复设置失败: {e}")
|
||
return {
|
||
'ai_enabled': False,
|
||
'model_name': 'qwen-plus',
|
||
'api_key': '',
|
||
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
|
||
'max_discount_percent': 10,
|
||
'max_discount_amount': 100,
|
||
'max_bargain_rounds': 3,
|
||
'custom_prompts': ''
|
||
}
|
||
|
||
def get_all_ai_reply_settings(self) -> Dict[str, dict]:
|
||
"""获取所有账号的AI回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT cookie_id, ai_enabled, model_name, api_key, base_url,
|
||
max_discount_percent, max_discount_amount, max_bargain_rounds,
|
||
custom_prompts
|
||
FROM ai_reply_settings
|
||
''')
|
||
|
||
result = {}
|
||
for row in cursor.fetchall():
|
||
cookie_id = row[0]
|
||
result[cookie_id] = {
|
||
'ai_enabled': bool(row[1]),
|
||
'model_name': row[2],
|
||
'api_key': row[3],
|
||
'base_url': row[4],
|
||
'max_discount_percent': row[5],
|
||
'max_discount_amount': row[6],
|
||
'max_bargain_rounds': row[7],
|
||
'custom_prompts': row[8]
|
||
}
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取所有AI回复设置失败: {e}")
|
||
return {}
|
||
|
||
# -------------------- 默认回复操作 --------------------
|
||
def save_default_reply(self, cookie_id: str, enabled: bool, reply_content: str = None):
|
||
"""保存默认回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO default_replies (cookie_id, enabled, reply_content, updated_at)
|
||
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
||
''', (cookie_id, enabled, reply_content))
|
||
self.conn.commit()
|
||
logger.debug(f"保存默认回复设置: {cookie_id} -> {'启用' if enabled else '禁用'}")
|
||
except Exception as e:
|
||
logger.error(f"保存默认回复设置失败: {e}")
|
||
raise
|
||
|
||
def get_default_reply(self, cookie_id: str) -> Optional[Dict[str, any]]:
|
||
"""获取指定账号的默认回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT enabled, reply_content FROM default_replies WHERE cookie_id = ?
|
||
''', (cookie_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
enabled, reply_content = result
|
||
return {
|
||
'enabled': bool(enabled),
|
||
'reply_content': reply_content or ''
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取默认回复设置失败: {e}")
|
||
return None
|
||
|
||
def get_all_default_replies(self) -> Dict[str, Dict[str, any]]:
|
||
"""获取所有账号的默认回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('SELECT cookie_id, enabled, reply_content FROM default_replies')
|
||
|
||
result = {}
|
||
for row in cursor.fetchall():
|
||
cookie_id, enabled, reply_content = row
|
||
result[cookie_id] = {
|
||
'enabled': bool(enabled),
|
||
'reply_content': reply_content or ''
|
||
}
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取所有默认回复设置失败: {e}")
|
||
return {}
|
||
|
||
def delete_default_reply(self, cookie_id: str) -> bool:
|
||
"""删除指定账号的默认回复设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "DELETE FROM default_replies WHERE cookie_id = ?", (cookie_id,))
|
||
self.conn.commit()
|
||
logger.debug(f"删除默认回复设置: {cookie_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"删除默认回复设置失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
# -------------------- 通知渠道操作 --------------------
|
||
def create_notification_channel(self, name: str, channel_type: str, config: str, user_id: int = None) -> int:
|
||
"""创建通知渠道"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT INTO notification_channels (name, type, config, user_id)
|
||
VALUES (?, ?, ?, ?)
|
||
''', (name, channel_type, config, user_id))
|
||
self.conn.commit()
|
||
channel_id = cursor.lastrowid
|
||
logger.debug(f"创建通知渠道: {name} (ID: {channel_id})")
|
||
return channel_id
|
||
except Exception as e:
|
||
logger.error(f"创建通知渠道失败: {e}")
|
||
self.conn.rollback()
|
||
raise
|
||
|
||
def get_notification_channels(self, user_id: int = None) -> List[Dict[str, any]]:
|
||
"""获取所有通知渠道"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
cursor.execute('''
|
||
SELECT id, name, type, config, enabled, created_at, updated_at
|
||
FROM notification_channels
|
||
WHERE user_id = ?
|
||
ORDER BY created_at DESC
|
||
''', (user_id,))
|
||
else:
|
||
cursor.execute('''
|
||
SELECT id, name, type, config, enabled, created_at, updated_at
|
||
FROM notification_channels
|
||
ORDER BY created_at DESC
|
||
''')
|
||
|
||
channels = []
|
||
for row in cursor.fetchall():
|
||
channels.append({
|
||
'id': row[0],
|
||
'name': row[1],
|
||
'type': row[2],
|
||
'config': row[3],
|
||
'enabled': bool(row[4]),
|
||
'created_at': row[5],
|
||
'updated_at': row[6]
|
||
})
|
||
|
||
return channels
|
||
except Exception as e:
|
||
logger.error(f"获取通知渠道失败: {e}")
|
||
return []
|
||
|
||
def get_notification_channel(self, channel_id: int) -> Optional[Dict[str, any]]:
|
||
"""获取指定通知渠道"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT id, name, type, config, enabled, created_at, updated_at
|
||
FROM notification_channels WHERE id = ?
|
||
''', (channel_id,))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'id': row[0],
|
||
'name': row[1],
|
||
'type': row[2],
|
||
'config': row[3],
|
||
'enabled': bool(row[4]),
|
||
'created_at': row[5],
|
||
'updated_at': row[6]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取通知渠道失败: {e}")
|
||
return None
|
||
|
||
def update_notification_channel(self, channel_id: int, name: str, config: str, enabled: bool = True) -> bool:
|
||
"""更新通知渠道"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
UPDATE notification_channels
|
||
SET name = ?, config = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
''', (name, config, enabled, channel_id))
|
||
self.conn.commit()
|
||
logger.debug(f"更新通知渠道: {channel_id}")
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"更新通知渠道失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def delete_notification_channel(self, channel_id: int) -> bool:
|
||
"""删除通知渠道"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "DELETE FROM notification_channels WHERE id = ?", (channel_id,))
|
||
self.conn.commit()
|
||
logger.debug(f"删除通知渠道: {channel_id}")
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"删除通知渠道失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
# -------------------- 消息通知配置操作 --------------------
|
||
def set_message_notification(self, cookie_id: str, channel_id: int, enabled: bool = True) -> bool:
|
||
"""设置账号的消息通知"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO message_notifications (cookie_id, channel_id, enabled)
|
||
VALUES (?, ?, ?)
|
||
''', (cookie_id, channel_id, enabled))
|
||
self.conn.commit()
|
||
logger.debug(f"设置消息通知: {cookie_id} -> {channel_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"设置消息通知失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_account_notifications(self, cookie_id: str) -> List[Dict[str, any]]:
|
||
"""获取账号的通知配置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT mn.id, mn.channel_id, mn.enabled, nc.name, nc.type, nc.config
|
||
FROM message_notifications mn
|
||
JOIN notification_channels nc ON mn.channel_id = nc.id
|
||
WHERE mn.cookie_id = ? AND nc.enabled = 1
|
||
ORDER BY mn.id
|
||
''', (cookie_id,))
|
||
|
||
notifications = []
|
||
for row in cursor.fetchall():
|
||
notifications.append({
|
||
'id': row[0],
|
||
'channel_id': row[1],
|
||
'enabled': bool(row[2]),
|
||
'channel_name': row[3],
|
||
'channel_type': row[4],
|
||
'channel_config': row[5]
|
||
})
|
||
|
||
return notifications
|
||
except Exception as e:
|
||
logger.error(f"获取账号通知配置失败: {e}")
|
||
return []
|
||
|
||
def get_all_message_notifications(self) -> Dict[str, List[Dict[str, any]]]:
|
||
"""获取所有账号的通知配置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT mn.cookie_id, mn.id, mn.channel_id, mn.enabled, nc.name, nc.type, nc.config
|
||
FROM message_notifications mn
|
||
JOIN notification_channels nc ON mn.channel_id = nc.id
|
||
WHERE nc.enabled = 1
|
||
ORDER BY mn.cookie_id, mn.id
|
||
''')
|
||
|
||
result = {}
|
||
for row in cursor.fetchall():
|
||
cookie_id = row[0]
|
||
if cookie_id not in result:
|
||
result[cookie_id] = []
|
||
|
||
result[cookie_id].append({
|
||
'id': row[1],
|
||
'channel_id': row[2],
|
||
'enabled': bool(row[3]),
|
||
'channel_name': row[4],
|
||
'channel_type': row[5],
|
||
'channel_config': row[6]
|
||
})
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"获取所有消息通知配置失败: {e}")
|
||
return {}
|
||
|
||
def delete_message_notification(self, notification_id: int) -> bool:
|
||
"""删除消息通知配置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "DELETE FROM message_notifications WHERE id = ?", (notification_id,))
|
||
self.conn.commit()
|
||
logger.debug(f"删除消息通知配置: {notification_id}")
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"删除消息通知配置失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def delete_account_notifications(self, cookie_id: str) -> bool:
|
||
"""删除账号的所有消息通知配置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "DELETE FROM message_notifications WHERE cookie_id = ?", (cookie_id,))
|
||
self.conn.commit()
|
||
logger.debug(f"删除账号通知配置: {cookie_id}")
|
||
return cursor.rowcount > 0
|
||
except Exception as e:
|
||
logger.error(f"删除账号通知配置失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
# -------------------- 备份和恢复操作 --------------------
|
||
def export_backup(self, user_id: int = None) -> Dict[str, any]:
|
||
"""导出系统备份数据(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
backup_data = {
|
||
'version': '1.0',
|
||
'timestamp': time.time(),
|
||
'user_id': user_id,
|
||
'data': {}
|
||
}
|
||
|
||
if user_id is not None:
|
||
# 用户级备份:只备份该用户的数据
|
||
# 备份用户的cookies
|
||
self._execute_sql(cursor, "SELECT * FROM cookies WHERE user_id = ?", (user_id,))
|
||
columns = [description[0] for description in cursor.description]
|
||
rows = cursor.fetchall()
|
||
backup_data['data']['cookies'] = {
|
||
'columns': columns,
|
||
'rows': [list(row) for row in rows]
|
||
}
|
||
|
||
# 备份用户cookies相关的其他数据
|
||
user_cookie_ids = [row[0] for row in rows] # 获取用户的cookie_id列表
|
||
|
||
if user_cookie_ids:
|
||
placeholders = ','.join(['?' for _ in user_cookie_ids])
|
||
|
||
# 备份关键字
|
||
cursor.execute(f"SELECT * FROM keywords WHERE cookie_id IN ({placeholders})", user_cookie_ids)
|
||
columns = [description[0] for description in cursor.description]
|
||
rows = cursor.fetchall()
|
||
backup_data['data']['keywords'] = {
|
||
'columns': columns,
|
||
'rows': [list(row) for row in rows]
|
||
}
|
||
|
||
# 备份其他相关表
|
||
related_tables = ['cookie_status', 'default_replies', 'message_notifications',
|
||
'item_info', 'ai_reply_settings', 'ai_conversations']
|
||
|
||
for table in related_tables:
|
||
cursor.execute(f"SELECT * FROM {table} WHERE cookie_id IN ({placeholders})", user_cookie_ids)
|
||
columns = [description[0] for description in cursor.description]
|
||
rows = cursor.fetchall()
|
||
backup_data['data'][table] = {
|
||
'columns': columns,
|
||
'rows': [list(row) for row in rows]
|
||
}
|
||
else:
|
||
# 系统级备份:备份所有数据
|
||
tables = [
|
||
'cookies', 'keywords', 'cookie_status', 'cards',
|
||
'delivery_rules', 'default_replies', 'notification_channels',
|
||
'message_notifications', 'system_settings', 'item_info',
|
||
'ai_reply_settings', 'ai_conversations', 'ai_item_cache'
|
||
]
|
||
|
||
for table in tables:
|
||
cursor.execute(f"SELECT * FROM {table}")
|
||
columns = [description[0] for description in cursor.description]
|
||
rows = cursor.fetchall()
|
||
|
||
backup_data['data'][table] = {
|
||
'columns': columns,
|
||
'rows': [list(row) for row in rows]
|
||
}
|
||
|
||
logger.info(f"导出备份成功,用户ID: {user_id}")
|
||
return backup_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"导出备份失败: {e}")
|
||
raise
|
||
|
||
def import_backup(self, backup_data: Dict[str, any], user_id: int = None) -> bool:
|
||
"""导入系统备份数据(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
# 验证备份数据格式
|
||
if not isinstance(backup_data, dict) or 'data' not in backup_data:
|
||
raise ValueError("备份数据格式无效")
|
||
|
||
# 开始事务
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "BEGIN TRANSACTION")
|
||
|
||
if user_id is not None:
|
||
# 用户级导入:只清空该用户的数据
|
||
# 获取用户的cookie_id列表
|
||
self._execute_sql(cursor, "SELECT id FROM cookies WHERE user_id = ?", (user_id,))
|
||
user_cookie_ids = [row[0] for row in cursor.fetchall()]
|
||
|
||
if user_cookie_ids:
|
||
placeholders = ','.join(['?' for _ in user_cookie_ids])
|
||
|
||
# 删除用户相关数据
|
||
related_tables = ['message_notifications', 'default_replies', 'item_info',
|
||
'cookie_status', 'keywords', 'ai_conversations', 'ai_reply_settings']
|
||
|
||
for table in related_tables:
|
||
cursor.execute(f"DELETE FROM {table} WHERE cookie_id IN ({placeholders})", user_cookie_ids)
|
||
|
||
# 删除用户的cookies
|
||
self._execute_sql(cursor, "DELETE FROM cookies WHERE user_id = ?", (user_id,))
|
||
else:
|
||
# 系统级导入:清空所有数据(除了用户和管理员密码)
|
||
tables = [
|
||
'message_notifications', 'notification_channels', 'default_replies',
|
||
'delivery_rules', 'cards', 'item_info', 'cookie_status', 'keywords',
|
||
'ai_conversations', 'ai_reply_settings', 'ai_item_cache', 'cookies'
|
||
]
|
||
|
||
for table in tables:
|
||
cursor.execute(f"DELETE FROM {table}")
|
||
|
||
# 清空系统设置(保留管理员密码)
|
||
self._execute_sql(cursor, "DELETE FROM system_settings WHERE key != 'admin_password_hash'")
|
||
|
||
# 导入数据
|
||
data = backup_data['data']
|
||
for table_name, table_data in data.items():
|
||
if table_name not in ['cookies', 'keywords', 'cookie_status', 'cards',
|
||
'delivery_rules', 'default_replies', 'notification_channels',
|
||
'message_notifications', 'system_settings', 'item_info',
|
||
'ai_reply_settings', 'ai_conversations', 'ai_item_cache']:
|
||
continue
|
||
|
||
columns = table_data['columns']
|
||
rows = table_data['rows']
|
||
|
||
if not rows:
|
||
continue
|
||
|
||
# 如果是用户级导入,需要确保cookies表的user_id正确
|
||
if user_id is not None and table_name == 'cookies':
|
||
# 更新所有导入的cookies的user_id
|
||
updated_rows = []
|
||
for row in rows:
|
||
row_dict = dict(zip(columns, row))
|
||
row_dict['user_id'] = user_id
|
||
updated_rows.append([row_dict[col] for col in columns])
|
||
rows = updated_rows
|
||
|
||
# 构建插入语句
|
||
placeholders = ','.join(['?' for _ in columns])
|
||
|
||
if table_name == 'system_settings':
|
||
# 系统设置需要特殊处理,避免覆盖管理员密码
|
||
for row in rows:
|
||
if len(row) >= 1 and row[0] != 'admin_password_hash':
|
||
cursor.execute(f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})", row)
|
||
else:
|
||
cursor.executemany(f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})", rows)
|
||
|
||
# 提交事务
|
||
self.conn.commit()
|
||
logger.info("导入备份成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"导入备份失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
# -------------------- 系统设置操作 --------------------
|
||
def get_system_setting(self, key: str) -> Optional[str]:
|
||
"""获取系统设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT value FROM system_settings WHERE key = ?", (key,))
|
||
result = cursor.fetchone()
|
||
return result[0] if result else None
|
||
except Exception as e:
|
||
logger.error(f"获取系统设置失败: {e}")
|
||
return None
|
||
|
||
def set_system_setting(self, key: str, value: str, description: str = None) -> bool:
|
||
"""设置系统设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO system_settings (key, value, description, updated_at)
|
||
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
||
''', (key, value, description))
|
||
self.conn.commit()
|
||
logger.debug(f"设置系统设置: {key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"设置系统设置失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_all_system_settings(self) -> Dict[str, str]:
|
||
"""获取所有系统设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "SELECT key, value FROM system_settings")
|
||
|
||
settings = {}
|
||
for row in cursor.fetchall():
|
||
settings[row[0]] = row[1]
|
||
|
||
return settings
|
||
except Exception as e:
|
||
logger.error(f"获取所有系统设置失败: {e}")
|
||
return {}
|
||
|
||
# 管理员密码现在统一使用用户表管理,不再需要单独的方法
|
||
|
||
# ==================== 用户管理方法 ====================
|
||
|
||
def create_user(self, username: str, email: str, password: str) -> bool:
|
||
"""创建新用户"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
||
|
||
cursor.execute('''
|
||
INSERT INTO users (username, email, password_hash)
|
||
VALUES (?, ?, ?)
|
||
''', (username, email, password_hash))
|
||
|
||
self.conn.commit()
|
||
logger.info(f"创建用户成功: {username} ({email})")
|
||
return True
|
||
except sqlite3.IntegrityError as e:
|
||
logger.error(f"创建用户失败,用户名或邮箱已存在: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"创建用户失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
|
||
"""根据用户名获取用户信息"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT id, username, email, password_hash, is_active, created_at, updated_at
|
||
FROM users WHERE username = ?
|
||
''', (username,))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'id': row[0],
|
||
'username': row[1],
|
||
'email': row[2],
|
||
'password_hash': row[3],
|
||
'is_active': row[4],
|
||
'created_at': row[5],
|
||
'updated_at': row[6]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取用户信息失败: {e}")
|
||
return None
|
||
|
||
def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
|
||
"""根据邮箱获取用户信息"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT id, username, email, password_hash, is_active, created_at, updated_at
|
||
FROM users WHERE email = ?
|
||
''', (email,))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'id': row[0],
|
||
'username': row[1],
|
||
'email': row[2],
|
||
'password_hash': row[3],
|
||
'is_active': row[4],
|
||
'created_at': row[5],
|
||
'updated_at': row[6]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取用户信息失败: {e}")
|
||
return None
|
||
|
||
def verify_user_password(self, username: str, password: str) -> bool:
|
||
"""验证用户密码"""
|
||
user = self.get_user_by_username(username)
|
||
if not user:
|
||
return False
|
||
|
||
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
||
return user['password_hash'] == password_hash and user['is_active']
|
||
|
||
def update_user_password(self, username: str, new_password: str) -> bool:
|
||
"""更新用户密码"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
password_hash = hashlib.sha256(new_password.encode()).hexdigest()
|
||
|
||
cursor.execute('''
|
||
UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE username = ?
|
||
''', (password_hash, username))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"用户 {username} 密码更新成功")
|
||
return True
|
||
else:
|
||
logger.warning(f"用户 {username} 不存在,密码更新失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新用户密码失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def generate_verification_code(self) -> str:
|
||
"""生成6位数字验证码"""
|
||
return ''.join(random.choices(string.digits, k=6))
|
||
|
||
def generate_captcha(self) -> Tuple[str, str]:
|
||
"""生成图形验证码
|
||
返回: (验证码文本, base64编码的图片)
|
||
"""
|
||
try:
|
||
# 生成4位随机验证码(数字+字母)
|
||
chars = string.ascii_uppercase + string.digits
|
||
captcha_text = ''.join(random.choices(chars, k=4))
|
||
|
||
# 创建图片
|
||
width, height = 120, 40
|
||
image = Image.new('RGB', (width, height), color='white')
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
# 尝试使用系统字体,如果失败则使用默认字体
|
||
try:
|
||
# Windows系统字体
|
||
font = ImageFont.truetype("arial.ttf", 20)
|
||
except:
|
||
try:
|
||
# 备用字体
|
||
font = ImageFont.truetype("C:/Windows/Fonts/arial.ttf", 20)
|
||
except:
|
||
# 使用默认字体
|
||
font = ImageFont.load_default()
|
||
|
||
# 绘制验证码文本
|
||
for i, char in enumerate(captcha_text):
|
||
# 随机颜色
|
||
color = (
|
||
random.randint(0, 100),
|
||
random.randint(0, 100),
|
||
random.randint(0, 100)
|
||
)
|
||
|
||
# 随机位置(稍微偏移)
|
||
x = 20 + i * 20 + random.randint(-3, 3)
|
||
y = 8 + random.randint(-3, 3)
|
||
|
||
draw.text((x, y), char, font=font, fill=color)
|
||
|
||
# 添加干扰线
|
||
for _ in range(3):
|
||
start = (random.randint(0, width), random.randint(0, height))
|
||
end = (random.randint(0, width), random.randint(0, height))
|
||
draw.line([start, end], fill=(random.randint(100, 200), random.randint(100, 200), random.randint(100, 200)), width=1)
|
||
|
||
# 添加干扰点
|
||
for _ in range(20):
|
||
x = random.randint(0, width)
|
||
y = random.randint(0, height)
|
||
draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
|
||
|
||
# 转换为base64
|
||
buffer = io.BytesIO()
|
||
image.save(buffer, format='PNG')
|
||
img_base64 = base64.b64encode(buffer.getvalue()).decode()
|
||
|
||
return captcha_text, f"data:image/png;base64,{img_base64}"
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成图形验证码失败: {e}")
|
||
# 返回简单的文本验证码作为备用
|
||
simple_code = ''.join(random.choices(string.digits, k=4))
|
||
return simple_code, ""
|
||
|
||
def save_captcha(self, session_id: str, captcha_text: str, expires_minutes: int = 5) -> bool:
|
||
"""保存图形验证码"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
expires_at = time.time() + (expires_minutes * 60)
|
||
|
||
# 删除该session的旧验证码
|
||
cursor.execute('DELETE FROM captcha_codes WHERE session_id = ?', (session_id,))
|
||
|
||
cursor.execute('''
|
||
INSERT INTO captcha_codes (session_id, code, expires_at)
|
||
VALUES (?, ?, ?)
|
||
''', (session_id, captcha_text.upper(), expires_at))
|
||
|
||
self.conn.commit()
|
||
logger.debug(f"保存图形验证码成功: {session_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存图形验证码失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def verify_captcha(self, session_id: str, user_input: str) -> bool:
|
||
"""验证图形验证码"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
current_time = time.time()
|
||
|
||
# 查找有效的验证码
|
||
cursor.execute('''
|
||
SELECT id FROM captcha_codes
|
||
WHERE session_id = ? AND code = ? AND expires_at > ?
|
||
ORDER BY created_at DESC LIMIT 1
|
||
''', (session_id, user_input.upper(), current_time))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
# 删除已使用的验证码
|
||
cursor.execute('DELETE FROM captcha_codes WHERE id = ?', (row[0],))
|
||
self.conn.commit()
|
||
logger.debug(f"图形验证码验证成功: {session_id}")
|
||
return True
|
||
else:
|
||
logger.warning(f"图形验证码验证失败: {session_id} - {user_input}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"验证图形验证码失败: {e}")
|
||
return False
|
||
|
||
def save_verification_code(self, email: str, code: str, code_type: str = 'register', expires_minutes: int = 10) -> bool:
|
||
"""保存邮箱验证码"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
expires_at = time.time() + (expires_minutes * 60)
|
||
|
||
cursor.execute('''
|
||
INSERT INTO email_verifications (email, code, type, expires_at)
|
||
VALUES (?, ?, ?, ?)
|
||
''', (email, code, code_type, expires_at))
|
||
|
||
self.conn.commit()
|
||
logger.info(f"保存验证码成功: {email} ({code_type})")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存验证码失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def verify_email_code(self, email: str, code: str, code_type: str = 'register') -> bool:
|
||
"""验证邮箱验证码"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
current_time = time.time()
|
||
|
||
# 查找有效的验证码
|
||
cursor.execute('''
|
||
SELECT id FROM email_verifications
|
||
WHERE email = ? AND code = ? AND type = ? AND expires_at > ? AND used = FALSE
|
||
ORDER BY created_at DESC LIMIT 1
|
||
''', (email, code, code_type, current_time))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
# 标记验证码为已使用
|
||
cursor.execute('''
|
||
UPDATE email_verifications SET used = TRUE WHERE id = ?
|
||
''', (row[0],))
|
||
self.conn.commit()
|
||
logger.info(f"验证码验证成功: {email} ({code_type})")
|
||
return True
|
||
else:
|
||
logger.warning(f"验证码验证失败: {email} - {code} ({code_type})")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"验证邮箱验证码失败: {e}")
|
||
return False
|
||
|
||
async def send_verification_email(self, email: str, code: str) -> bool:
|
||
"""发送验证码邮件"""
|
||
try:
|
||
subject = "闲鱼自动回复系统 - 邮箱验证码"
|
||
# 使用简单的纯文本邮件内容
|
||
text_content = f"""【闲鱼自动回复系统】邮箱验证码
|
||
|
||
您好!
|
||
|
||
感谢您使用闲鱼自动回复系统。为了确保账户安全,请使用以下验证码完成邮箱验证:
|
||
|
||
验证码:{code}
|
||
|
||
重要提醒:
|
||
• 验证码有效期为 10 分钟,请及时使用
|
||
• 请勿将验证码分享给任何人
|
||
• 如非本人操作,请忽略此邮件
|
||
• 系统不会主动索要您的验证码
|
||
|
||
如果您在使用过程中遇到任何问题,请联系我们的技术支持团队。
|
||
感谢您选择闲鱼自动回复系统!
|
||
|
||
---
|
||
此邮件由系统自动发送,请勿直接回复
|
||
© 2025 闲鱼自动回复系统"""
|
||
|
||
# 使用GET请求发送邮件
|
||
api_url = "https://dy.zhinianboke.com/api/emailSend"
|
||
params = {
|
||
'subject': subject,
|
||
'receiveUser': email,
|
||
'sendHtml': text_content
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
try:
|
||
logger.info(f"发送验证码邮件: {email}")
|
||
async with session.get(api_url, params=params, timeout=15) as response:
|
||
response_text = await response.text()
|
||
logger.info(f"邮件API响应: {response.status}")
|
||
|
||
if response.status == 200:
|
||
logger.info(f"验证码邮件发送成功: {email}")
|
||
return True
|
||
else:
|
||
logger.error(f"验证码邮件发送失败: {email}, 状态码: {response.status}, 响应: {response_text[:200]}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"邮件发送异常: {email}, 错误: {e}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送验证码邮件异常: {e}")
|
||
return False
|
||
|
||
# ==================== 卡券管理方法 ====================
|
||
|
||
def create_card(self, name: str, card_type: str, api_config=None,
|
||
text_content: str = None, data_content: str = None,
|
||
description: str = None, enabled: bool = True, delay_seconds: int = 0,
|
||
is_multi_spec: bool = False, spec_name: str = None, spec_value: str = None,
|
||
user_id: int = None):
|
||
"""创建新卡券(支持多规格)"""
|
||
with self.lock:
|
||
try:
|
||
# 验证多规格参数
|
||
if is_multi_spec:
|
||
if not spec_name or not spec_value:
|
||
raise ValueError("多规格卡券必须提供规格名称和规格值")
|
||
|
||
# 检查唯一性:卡券名称+规格名称+规格值
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT COUNT(*) FROM cards
|
||
WHERE name = ? AND spec_name = ? AND spec_value = ? AND user_id = ?
|
||
''', (name, spec_name, spec_value, user_id))
|
||
|
||
if cursor.fetchone()[0] > 0:
|
||
raise ValueError(f"卡券已存在:{name} - {spec_name}:{spec_value}")
|
||
else:
|
||
# 检查唯一性:仅卡券名称
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT COUNT(*) FROM cards
|
||
WHERE name = ? AND (is_multi_spec = 0 OR is_multi_spec IS NULL) AND user_id = ?
|
||
''', (name, user_id))
|
||
|
||
if cursor.fetchone()[0] > 0:
|
||
raise ValueError(f"卡券名称已存在:{name}")
|
||
|
||
# 处理api_config参数 - 如果是字典则转换为JSON字符串
|
||
api_config_str = None
|
||
if api_config is not None:
|
||
if isinstance(api_config, dict):
|
||
import json
|
||
api_config_str = json.dumps(api_config)
|
||
else:
|
||
api_config_str = str(api_config)
|
||
|
||
cursor.execute('''
|
||
INSERT INTO cards (name, type, api_config, text_content, data_content,
|
||
description, enabled, delay_seconds, is_multi_spec,
|
||
spec_name, spec_value, user_id)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (name, card_type, api_config_str, text_content, data_content,
|
||
description, enabled, delay_seconds, is_multi_spec,
|
||
spec_name, spec_value, user_id))
|
||
self.conn.commit()
|
||
card_id = cursor.lastrowid
|
||
|
||
if is_multi_spec:
|
||
logger.info(f"创建多规格卡券成功: {name} - {spec_name}:{spec_value} (ID: {card_id})")
|
||
else:
|
||
logger.info(f"创建卡券成功: {name} (ID: {card_id})")
|
||
return card_id
|
||
except Exception as e:
|
||
logger.error(f"创建卡券失败: {e}")
|
||
raise
|
||
|
||
def get_all_cards(self, user_id: int = None):
|
||
"""获取所有卡券(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
cursor.execute('''
|
||
SELECT id, name, type, api_config, text_content, data_content,
|
||
description, enabled, delay_seconds, is_multi_spec,
|
||
spec_name, spec_value, created_at, updated_at
|
||
FROM cards
|
||
WHERE user_id = ?
|
||
ORDER BY created_at DESC
|
||
''', (user_id,))
|
||
else:
|
||
cursor.execute('''
|
||
SELECT id, name, type, api_config, text_content, data_content,
|
||
description, enabled, delay_seconds, is_multi_spec,
|
||
spec_name, spec_value, created_at, updated_at
|
||
FROM cards
|
||
ORDER BY created_at DESC
|
||
''')
|
||
|
||
cards = []
|
||
for row in cursor.fetchall():
|
||
# 解析api_config JSON字符串
|
||
api_config = row[3]
|
||
if api_config:
|
||
try:
|
||
import json
|
||
api_config = json.loads(api_config)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果解析失败,保持原始字符串
|
||
pass
|
||
|
||
cards.append({
|
||
'id': row[0],
|
||
'name': row[1],
|
||
'type': row[2],
|
||
'api_config': api_config,
|
||
'text_content': row[4],
|
||
'data_content': row[5],
|
||
'description': row[6],
|
||
'enabled': bool(row[7]),
|
||
'delay_seconds': row[8] or 0,
|
||
'is_multi_spec': bool(row[9]) if row[9] is not None else False,
|
||
'spec_name': row[10],
|
||
'spec_value': row[11],
|
||
'created_at': row[12],
|
||
'updated_at': row[13]
|
||
})
|
||
|
||
return cards
|
||
except Exception as e:
|
||
logger.error(f"获取卡券列表失败: {e}")
|
||
return []
|
||
|
||
def get_card_by_id(self, card_id: int, user_id: int = None):
|
||
"""根据ID获取卡券(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
cursor.execute('''
|
||
SELECT id, name, type, api_config, text_content, data_content,
|
||
description, enabled, delay_seconds, is_multi_spec,
|
||
spec_name, spec_value, created_at, updated_at
|
||
FROM cards WHERE id = ? AND user_id = ?
|
||
''', (card_id, user_id))
|
||
else:
|
||
cursor.execute('''
|
||
SELECT id, name, type, api_config, text_content, data_content,
|
||
description, enabled, delay_seconds, is_multi_spec,
|
||
spec_name, spec_value, created_at, updated_at
|
||
FROM cards WHERE id = ?
|
||
''', (card_id,))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
# 解析api_config JSON字符串
|
||
api_config = row[3]
|
||
if api_config:
|
||
try:
|
||
import json
|
||
api_config = json.loads(api_config)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果解析失败,保持原始字符串
|
||
pass
|
||
|
||
return {
|
||
'id': row[0],
|
||
'name': row[1],
|
||
'type': row[2],
|
||
'api_config': api_config,
|
||
'text_content': row[4],
|
||
'data_content': row[5],
|
||
'description': row[6],
|
||
'enabled': bool(row[7]),
|
||
'delay_seconds': row[8] or 0,
|
||
'is_multi_spec': bool(row[9]) if row[9] is not None else False,
|
||
'spec_name': row[10],
|
||
'spec_value': row[11],
|
||
'created_at': row[12],
|
||
'updated_at': row[13]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取卡券失败: {e}")
|
||
return None
|
||
|
||
def update_card(self, card_id: int, name: str = None, card_type: str = None,
|
||
api_config=None, text_content: str = None, data_content: str = None,
|
||
description: str = None, enabled: bool = None, delay_seconds: int = None,
|
||
is_multi_spec: bool = None, spec_name: str = None, spec_value: str = None):
|
||
"""更新卡券"""
|
||
with self.lock:
|
||
try:
|
||
# 处理api_config参数
|
||
api_config_str = None
|
||
if api_config is not None:
|
||
if isinstance(api_config, dict):
|
||
import json
|
||
api_config_str = json.dumps(api_config)
|
||
else:
|
||
api_config_str = str(api_config)
|
||
|
||
cursor = self.conn.cursor()
|
||
|
||
# 构建更新语句
|
||
update_fields = []
|
||
params = []
|
||
|
||
if name is not None:
|
||
update_fields.append("name = ?")
|
||
params.append(name)
|
||
if card_type is not None:
|
||
update_fields.append("type = ?")
|
||
params.append(card_type)
|
||
if api_config_str is not None:
|
||
update_fields.append("api_config = ?")
|
||
params.append(api_config_str)
|
||
if text_content is not None:
|
||
update_fields.append("text_content = ?")
|
||
params.append(text_content)
|
||
if data_content is not None:
|
||
update_fields.append("data_content = ?")
|
||
params.append(data_content)
|
||
if description is not None:
|
||
update_fields.append("description = ?")
|
||
params.append(description)
|
||
if enabled is not None:
|
||
update_fields.append("enabled = ?")
|
||
params.append(enabled)
|
||
if delay_seconds is not None:
|
||
update_fields.append("delay_seconds = ?")
|
||
params.append(delay_seconds)
|
||
if is_multi_spec is not None:
|
||
update_fields.append("is_multi_spec = ?")
|
||
params.append(is_multi_spec)
|
||
if spec_name is not None:
|
||
update_fields.append("spec_name = ?")
|
||
params.append(spec_name)
|
||
if spec_value is not None:
|
||
update_fields.append("spec_value = ?")
|
||
params.append(spec_value)
|
||
|
||
if not update_fields:
|
||
return True # 没有需要更新的字段
|
||
|
||
update_fields.append("updated_at = CURRENT_TIMESTAMP")
|
||
params.append(card_id)
|
||
|
||
sql = f"UPDATE cards SET {', '.join(update_fields)} WHERE id = ?"
|
||
self._execute_sql(cursor, sql, params)
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"更新卡券成功: ID {card_id}")
|
||
return True
|
||
else:
|
||
return False # 没有找到对应的记录
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新卡券失败: {e}")
|
||
self.conn.rollback()
|
||
raise
|
||
|
||
# ==================== 自动发货规则方法 ====================
|
||
|
||
def create_delivery_rule(self, keyword: str, card_id: int, delivery_count: int = 1,
|
||
enabled: bool = True, description: str = None, user_id: int = None):
|
||
"""创建发货规则"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT INTO delivery_rules (keyword, card_id, delivery_count, enabled, description, user_id)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
''', (keyword, card_id, delivery_count, enabled, description, user_id))
|
||
self.conn.commit()
|
||
rule_id = cursor.lastrowid
|
||
logger.info(f"创建发货规则成功: {keyword} -> 卡券ID {card_id} (规则ID: {rule_id})")
|
||
return rule_id
|
||
except Exception as e:
|
||
logger.error(f"创建发货规则失败: {e}")
|
||
raise
|
||
|
||
def get_all_delivery_rules(self, user_id: int = None):
|
||
"""获取所有发货规则"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
cursor.execute('''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
|
||
c.name as card_name, c.type as card_type,
|
||
c.is_multi_spec, c.spec_name, c.spec_value
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
WHERE dr.user_id = ?
|
||
ORDER BY dr.created_at DESC
|
||
''', (user_id,))
|
||
else:
|
||
cursor.execute('''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
|
||
c.name as card_name, c.type as card_type,
|
||
c.is_multi_spec, c.spec_name, c.spec_value
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
ORDER BY dr.created_at DESC
|
||
''')
|
||
|
||
rules = []
|
||
for row in cursor.fetchall():
|
||
rules.append({
|
||
'id': row[0],
|
||
'keyword': row[1],
|
||
'card_id': row[2],
|
||
'delivery_count': row[3],
|
||
'enabled': bool(row[4]),
|
||
'description': row[5],
|
||
'delivery_times': row[6],
|
||
'created_at': row[7],
|
||
'updated_at': row[8],
|
||
'card_name': row[9],
|
||
'card_type': row[10],
|
||
'is_multi_spec': bool(row[11]) if row[11] is not None else False,
|
||
'spec_name': row[12],
|
||
'spec_value': row[13]
|
||
})
|
||
|
||
return rules
|
||
except Exception as e:
|
||
logger.error(f"获取发货规则列表失败: {e}")
|
||
return []
|
||
|
||
def get_delivery_rules_by_keyword(self, keyword: str):
|
||
"""根据关键字获取匹配的发货规则"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
# 使用更灵活的匹配方式:既支持商品内容包含关键字,也支持关键字包含在商品内容中
|
||
cursor.execute('''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times,
|
||
c.name as card_name, c.type as card_type, c.api_config,
|
||
c.text_content, c.data_content, c.enabled as card_enabled, c.description as card_description,
|
||
c.delay_seconds as card_delay_seconds
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
WHERE dr.enabled = 1 AND c.enabled = 1
|
||
AND (? LIKE '%' || dr.keyword || '%' OR dr.keyword LIKE '%' || ? || '%')
|
||
ORDER BY
|
||
CASE
|
||
WHEN ? LIKE '%' || dr.keyword || '%' THEN LENGTH(dr.keyword)
|
||
ELSE LENGTH(dr.keyword) / 2
|
||
END DESC,
|
||
dr.id ASC
|
||
''', (keyword, keyword, keyword))
|
||
|
||
rules = []
|
||
for row in cursor.fetchall():
|
||
# 解析api_config JSON字符串
|
||
api_config = row[9]
|
||
if api_config:
|
||
try:
|
||
import json
|
||
api_config = json.loads(api_config)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果解析失败,保持原始字符串
|
||
pass
|
||
|
||
rules.append({
|
||
'id': row[0],
|
||
'keyword': row[1],
|
||
'card_id': row[2],
|
||
'delivery_count': row[3],
|
||
'enabled': bool(row[4]),
|
||
'description': row[5],
|
||
'delivery_times': row[6],
|
||
'card_name': row[7],
|
||
'card_type': row[8],
|
||
'api_config': api_config, # 修复字段名
|
||
'text_content': row[10],
|
||
'data_content': row[11],
|
||
'card_enabled': bool(row[12]),
|
||
'card_description': row[13], # 卡券备注信息
|
||
'card_delay_seconds': row[14] or 0 # 延时秒数
|
||
})
|
||
|
||
return rules
|
||
except Exception as e:
|
||
logger.error(f"根据关键字获取发货规则失败: {e}")
|
||
return []
|
||
|
||
def get_delivery_rule_by_id(self, rule_id: int, user_id: int = None):
|
||
"""根据ID获取发货规则(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
self._execute_sql(cursor, '''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
|
||
c.name as card_name, c.type as card_type
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
WHERE dr.id = ? AND dr.user_id = ?
|
||
''', (rule_id, user_id))
|
||
else:
|
||
self._execute_sql(cursor, '''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
|
||
c.name as card_name, c.type as card_type
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
WHERE dr.id = ?
|
||
''', (rule_id,))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'id': row[0],
|
||
'keyword': row[1],
|
||
'card_id': row[2],
|
||
'delivery_count': row[3],
|
||
'enabled': bool(row[4]),
|
||
'description': row[5],
|
||
'delivery_times': row[6],
|
||
'created_at': row[7],
|
||
'updated_at': row[8],
|
||
'card_name': row[9],
|
||
'card_type': row[10]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取发货规则失败: {e}")
|
||
return None
|
||
|
||
def update_delivery_rule(self, rule_id: int, keyword: str = None, card_id: int = None,
|
||
delivery_count: int = None, enabled: bool = None,
|
||
description: str = None, user_id: int = None):
|
||
"""更新发货规则(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 构建更新语句
|
||
update_fields = []
|
||
params = []
|
||
|
||
if keyword is not None:
|
||
update_fields.append("keyword = ?")
|
||
params.append(keyword)
|
||
if card_id is not None:
|
||
update_fields.append("card_id = ?")
|
||
params.append(card_id)
|
||
if delivery_count is not None:
|
||
update_fields.append("delivery_count = ?")
|
||
params.append(delivery_count)
|
||
if enabled is not None:
|
||
update_fields.append("enabled = ?")
|
||
params.append(enabled)
|
||
if description is not None:
|
||
update_fields.append("description = ?")
|
||
params.append(description)
|
||
|
||
if not update_fields:
|
||
return True # 没有需要更新的字段
|
||
|
||
update_fields.append("updated_at = CURRENT_TIMESTAMP")
|
||
params.append(rule_id)
|
||
|
||
if user_id is not None:
|
||
params.append(user_id)
|
||
sql = f"UPDATE delivery_rules SET {', '.join(update_fields)} WHERE id = ? AND user_id = ?"
|
||
else:
|
||
sql = f"UPDATE delivery_rules SET {', '.join(update_fields)} WHERE id = ?"
|
||
|
||
self._execute_sql(cursor, sql, params)
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"更新发货规则成功: ID {rule_id}")
|
||
return True
|
||
else:
|
||
return False # 没有找到对应的记录
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新发货规则失败: {e}")
|
||
self.conn.rollback()
|
||
raise
|
||
|
||
def increment_delivery_times(self, rule_id: int):
|
||
"""增加发货次数"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
UPDATE delivery_rules
|
||
SET delivery_times = delivery_times + 1, updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
''', (rule_id,))
|
||
self.conn.commit()
|
||
logger.debug(f"发货规则 {rule_id} 发货次数已增加")
|
||
except Exception as e:
|
||
logger.error(f"更新发货次数失败: {e}")
|
||
|
||
def get_delivery_rules_by_keyword_and_spec(self, keyword: str, spec_name: str = None, spec_value: str = None):
|
||
"""根据关键字和规格信息获取匹配的发货规则(支持多规格)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 优先匹配:卡券名称+规格名称+规格值
|
||
if spec_name and spec_value:
|
||
cursor.execute('''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times,
|
||
c.name as card_name, c.type as card_type, c.api_config,
|
||
c.text_content, c.data_content, c.enabled as card_enabled,
|
||
c.description as card_description, c.delay_seconds as card_delay_seconds,
|
||
c.is_multi_spec, c.spec_name, c.spec_value
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
WHERE dr.enabled = 1 AND c.enabled = 1
|
||
AND (? LIKE '%' || dr.keyword || '%' OR dr.keyword LIKE '%' || ? || '%')
|
||
AND c.is_multi_spec = 1 AND c.spec_name = ? AND c.spec_value = ?
|
||
ORDER BY
|
||
CASE
|
||
WHEN ? LIKE '%' || dr.keyword || '%' THEN LENGTH(dr.keyword)
|
||
ELSE LENGTH(dr.keyword) / 2
|
||
END DESC,
|
||
dr.delivery_times ASC
|
||
''', (keyword, keyword, spec_name, spec_value, keyword))
|
||
|
||
rules = []
|
||
for row in cursor.fetchall():
|
||
# 解析api_config JSON字符串
|
||
api_config = row[9]
|
||
if api_config:
|
||
try:
|
||
import json
|
||
api_config = json.loads(api_config)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果解析失败,保持原始字符串
|
||
pass
|
||
|
||
rules.append({
|
||
'id': row[0],
|
||
'keyword': row[1],
|
||
'card_id': row[2],
|
||
'delivery_count': row[3],
|
||
'enabled': bool(row[4]),
|
||
'description': row[5],
|
||
'delivery_times': row[6] or 0,
|
||
'card_name': row[7],
|
||
'card_type': row[8],
|
||
'api_config': api_config,
|
||
'text_content': row[10],
|
||
'data_content': row[11],
|
||
'card_enabled': bool(row[12]),
|
||
'card_description': row[13],
|
||
'card_delay_seconds': row[14] or 0,
|
||
'is_multi_spec': bool(row[15]),
|
||
'spec_name': row[16],
|
||
'spec_value': row[17]
|
||
})
|
||
|
||
if rules:
|
||
logger.info(f"找到多规格匹配规则: {keyword} - {spec_name}:{spec_value}")
|
||
return rules
|
||
|
||
# 兜底匹配:仅卡券名称
|
||
cursor.execute('''
|
||
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
|
||
dr.description, dr.delivery_times,
|
||
c.name as card_name, c.type as card_type, c.api_config,
|
||
c.text_content, c.data_content, c.enabled as card_enabled,
|
||
c.description as card_description, c.delay_seconds as card_delay_seconds,
|
||
c.is_multi_spec, c.spec_name, c.spec_value
|
||
FROM delivery_rules dr
|
||
LEFT JOIN cards c ON dr.card_id = c.id
|
||
WHERE dr.enabled = 1 AND c.enabled = 1
|
||
AND (? LIKE '%' || dr.keyword || '%' OR dr.keyword LIKE '%' || ? || '%')
|
||
AND (c.is_multi_spec = 0 OR c.is_multi_spec IS NULL)
|
||
ORDER BY
|
||
CASE
|
||
WHEN ? LIKE '%' || dr.keyword || '%' THEN LENGTH(dr.keyword)
|
||
ELSE LENGTH(dr.keyword) / 2
|
||
END DESC,
|
||
dr.delivery_times ASC
|
||
''', (keyword, keyword, keyword))
|
||
|
||
rules = []
|
||
for row in cursor.fetchall():
|
||
# 解析api_config JSON字符串
|
||
api_config = row[9]
|
||
if api_config:
|
||
try:
|
||
import json
|
||
api_config = json.loads(api_config)
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 如果解析失败,保持原始字符串
|
||
pass
|
||
|
||
rules.append({
|
||
'id': row[0],
|
||
'keyword': row[1],
|
||
'card_id': row[2],
|
||
'delivery_count': row[3],
|
||
'enabled': bool(row[4]),
|
||
'description': row[5],
|
||
'delivery_times': row[6] or 0,
|
||
'card_name': row[7],
|
||
'card_type': row[8],
|
||
'api_config': api_config,
|
||
'text_content': row[10],
|
||
'data_content': row[11],
|
||
'card_enabled': bool(row[12]),
|
||
'card_description': row[13],
|
||
'card_delay_seconds': row[14] or 0,
|
||
'is_multi_spec': bool(row[15]) if row[15] is not None else False,
|
||
'spec_name': row[16],
|
||
'spec_value': row[17]
|
||
})
|
||
|
||
if rules:
|
||
logger.info(f"找到兜底匹配规则: {keyword}")
|
||
else:
|
||
logger.info(f"未找到匹配规则: {keyword}")
|
||
|
||
return rules
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取发货规则失败: {e}")
|
||
return []
|
||
|
||
def delete_card(self, card_id: int):
|
||
"""删除卡券"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
self._execute_sql(cursor, "DELETE FROM cards WHERE id = ?", (card_id,))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"删除卡券成功: ID {card_id}")
|
||
return True
|
||
else:
|
||
return False # 没有找到对应的记录
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除卡券失败: {e}")
|
||
self.conn.rollback()
|
||
raise
|
||
|
||
def delete_delivery_rule(self, rule_id: int, user_id: int = None):
|
||
"""删除发货规则(支持用户隔离)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
if user_id is not None:
|
||
self._execute_sql(cursor, "DELETE FROM delivery_rules WHERE id = ? AND user_id = ?", (rule_id, user_id))
|
||
else:
|
||
self._execute_sql(cursor, "DELETE FROM delivery_rules WHERE id = ?", (rule_id,))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"删除发货规则成功: ID {rule_id} (用户ID: {user_id})")
|
||
return True
|
||
else:
|
||
return False # 没有找到对应的记录
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除发货规则失败: {e}")
|
||
self.conn.rollback()
|
||
raise
|
||
|
||
def consume_batch_data(self, card_id: int):
|
||
"""消费批量数据的第一条记录(线程安全)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 获取卡券的批量数据
|
||
self._execute_sql(cursor, "SELECT data_content FROM cards WHERE id = ? AND type = 'data'", (card_id,))
|
||
result = cursor.fetchone()
|
||
|
||
if not result or not result[0]:
|
||
logger.warning(f"卡券 {card_id} 没有批量数据")
|
||
return None
|
||
|
||
data_content = result[0]
|
||
lines = [line.strip() for line in data_content.split('\n') if line.strip()]
|
||
|
||
if not lines:
|
||
logger.warning(f"卡券 {card_id} 批量数据为空")
|
||
return None
|
||
|
||
# 获取第一条数据
|
||
first_line = lines[0]
|
||
|
||
# 移除第一条数据,更新数据库
|
||
remaining_lines = lines[1:]
|
||
new_data_content = '\n'.join(remaining_lines)
|
||
|
||
cursor.execute('''
|
||
UPDATE cards
|
||
SET data_content = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
''', (new_data_content, card_id))
|
||
|
||
self.conn.commit()
|
||
|
||
logger.info(f"消费批量数据成功: 卡券ID={card_id}, 剩余={len(remaining_lines)}条")
|
||
return first_line
|
||
|
||
except Exception as e:
|
||
logger.error(f"消费批量数据失败: {e}")
|
||
self.conn.rollback()
|
||
return None
|
||
|
||
# ==================== 商品信息管理 ====================
|
||
|
||
def save_item_basic_info(self, cookie_id: str, item_id: str, item_title: str = None,
|
||
item_description: str = None, item_category: str = None,
|
||
item_price: str = None, item_detail: str = None) -> bool:
|
||
"""保存或更新商品基本信息,使用原子操作避免并发问题
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
item_id: 商品ID
|
||
item_title: 商品标题
|
||
item_description: 商品描述
|
||
item_category: 商品分类
|
||
item_price: 商品价格
|
||
item_detail: 商品详情JSON
|
||
|
||
Returns:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 使用 INSERT OR IGNORE + UPDATE 的原子操作模式
|
||
# 首先尝试插入,如果已存在则忽略
|
||
cursor.execute('''
|
||
INSERT OR IGNORE INTO item_info (cookie_id, item_id, item_title, item_description,
|
||
item_category, item_price, item_detail, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||
''', (cookie_id, item_id, item_title or '', item_description or '',
|
||
item_category or '', item_price or '', item_detail or ''))
|
||
|
||
# 如果是新插入的记录,直接返回成功
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"新增商品基本信息: {item_id} - {item_title}")
|
||
return True
|
||
|
||
# 记录已存在,使用原子UPDATE操作,只更新非空字段且不覆盖现有非空值
|
||
update_parts = []
|
||
params = []
|
||
|
||
# 使用 CASE WHEN 语句进行条件更新,避免覆盖现有数据
|
||
if item_title:
|
||
update_parts.append("item_title = CASE WHEN (item_title IS NULL OR item_title = '') THEN ? ELSE item_title END")
|
||
params.append(item_title)
|
||
|
||
if item_description:
|
||
update_parts.append("item_description = CASE WHEN (item_description IS NULL OR item_description = '') THEN ? ELSE item_description END")
|
||
params.append(item_description)
|
||
|
||
if item_category:
|
||
update_parts.append("item_category = CASE WHEN (item_category IS NULL OR item_category = '') THEN ? ELSE item_category END")
|
||
params.append(item_category)
|
||
|
||
if item_price:
|
||
update_parts.append("item_price = CASE WHEN (item_price IS NULL OR item_price = '') THEN ? ELSE item_price END")
|
||
params.append(item_price)
|
||
|
||
# 对于item_detail,只有在现有值为空时才更新
|
||
if item_detail:
|
||
update_parts.append("item_detail = CASE WHEN (item_detail IS NULL OR item_detail = '' OR TRIM(item_detail) = '') THEN ? ELSE item_detail END")
|
||
params.append(item_detail)
|
||
|
||
if update_parts:
|
||
update_parts.append("updated_at = CURRENT_TIMESTAMP")
|
||
params.extend([cookie_id, item_id])
|
||
|
||
sql = f"UPDATE item_info SET {', '.join(update_parts)} WHERE cookie_id = ? AND item_id = ?"
|
||
self._execute_sql(cursor, sql, params)
|
||
|
||
if cursor.rowcount > 0:
|
||
logger.info(f"更新商品基本信息: {item_id} - {item_title}")
|
||
else:
|
||
logger.debug(f"商品信息无需更新: {item_id}")
|
||
|
||
self.conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存商品基本信息失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def save_item_info(self, cookie_id: str, item_id: str, item_data = None) -> bool:
|
||
"""保存或更新商品信息
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
item_id: 商品ID
|
||
item_data: 商品详情数据,可以是字符串或字典,也可以为None
|
||
|
||
Returns:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 检查商品是否已存在
|
||
cursor.execute('''
|
||
SELECT id, item_detail FROM item_info
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (cookie_id, item_id))
|
||
|
||
existing = cursor.fetchone()
|
||
|
||
if existing:
|
||
# 如果传入的商品详情有值,则用最新数据覆盖
|
||
if item_data is not None and item_data:
|
||
# 处理字符串类型的详情数据
|
||
if isinstance(item_data, str):
|
||
cursor.execute('''
|
||
UPDATE item_info SET
|
||
item_detail = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (item_data, cookie_id, item_id))
|
||
else:
|
||
# 处理字典类型的详情数据(向后兼容)
|
||
cursor.execute('''
|
||
UPDATE item_info SET
|
||
item_title = ?, item_description = ?, item_category = ?,
|
||
item_price = ?, item_detail = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (
|
||
item_data.get('title', ''),
|
||
item_data.get('description', ''),
|
||
item_data.get('category', ''),
|
||
item_data.get('price', ''),
|
||
json.dumps(item_data, ensure_ascii=False),
|
||
cookie_id, item_id
|
||
))
|
||
logger.info(f"更新商品信息(覆盖): {item_id}")
|
||
else:
|
||
# 如果商品详情没有数据,则不更新,只记录存在
|
||
logger.debug(f"商品信息已存在,无新数据,跳过更新: {item_id}")
|
||
return True
|
||
else:
|
||
# 新增商品信息
|
||
if isinstance(item_data, str):
|
||
# 直接保存字符串详情
|
||
cursor.execute('''
|
||
INSERT INTO item_info (cookie_id, item_id, item_detail)
|
||
VALUES (?, ?, ?)
|
||
''', (cookie_id, item_id, item_data))
|
||
else:
|
||
# 处理字典类型的详情数据(向后兼容)
|
||
cursor.execute('''
|
||
INSERT INTO item_info (cookie_id, item_id, item_title, item_description,
|
||
item_category, item_price, item_detail)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
cookie_id, item_id,
|
||
item_data.get('title', '') if item_data else '',
|
||
item_data.get('description', '') if item_data else '',
|
||
item_data.get('category', '') if item_data else '',
|
||
item_data.get('price', '') if item_data else '',
|
||
json.dumps(item_data, ensure_ascii=False) if item_data else ''
|
||
))
|
||
logger.info(f"新增商品信息: {item_id}")
|
||
|
||
self.conn.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存商品信息失败: {e}")
|
||
return False
|
||
|
||
def get_item_info(self, cookie_id: str, item_id: str) -> Optional[Dict]:
|
||
"""获取商品信息
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
item_id: 商品ID
|
||
|
||
Returns:
|
||
Dict: 商品信息,如果不存在返回None
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT * FROM item_info
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (cookie_id, item_id))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
columns = [description[0] for description in cursor.description]
|
||
item_info = dict(zip(columns, row))
|
||
|
||
# 解析item_detail JSON
|
||
if item_info.get('item_detail'):
|
||
try:
|
||
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
|
||
except:
|
||
item_info['item_detail_parsed'] = {}
|
||
|
||
return item_info
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取商品信息失败: {e}")
|
||
return None
|
||
|
||
def update_item_multi_spec_status(self, cookie_id: str, item_id: str, is_multi_spec: bool) -> bool:
|
||
"""更新商品的多规格状态"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
UPDATE item_info
|
||
SET is_multi_spec = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (is_multi_spec, cookie_id, item_id))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"更新商品多规格状态成功: {item_id} -> {is_multi_spec}")
|
||
return True
|
||
else:
|
||
logger.warning(f"商品不存在,无法更新多规格状态: {item_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新商品多规格状态失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def get_item_multi_spec_status(self, cookie_id: str, item_id: str) -> bool:
|
||
"""获取商品的多规格状态"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT is_multi_spec FROM item_info
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (cookie_id, item_id))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return bool(row[0]) if row[0] is not None else False
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取商品多规格状态失败: {e}")
|
||
return False
|
||
|
||
def get_items_by_cookie(self, cookie_id: str) -> List[Dict]:
|
||
"""获取指定Cookie的所有商品信息
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
|
||
Returns:
|
||
List[Dict]: 商品信息列表
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT * FROM item_info
|
||
WHERE cookie_id = ?
|
||
ORDER BY updated_at DESC
|
||
''', (cookie_id,))
|
||
|
||
columns = [description[0] for description in cursor.description]
|
||
items = []
|
||
|
||
for row in cursor.fetchall():
|
||
item_info = dict(zip(columns, row))
|
||
|
||
# 解析item_detail JSON
|
||
if item_info.get('item_detail'):
|
||
try:
|
||
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
|
||
except:
|
||
item_info['item_detail_parsed'] = {}
|
||
|
||
items.append(item_info)
|
||
|
||
return items
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Cookie商品信息失败: {e}")
|
||
return []
|
||
|
||
def get_all_items(self) -> List[Dict]:
|
||
"""获取所有商品信息
|
||
|
||
Returns:
|
||
List[Dict]: 所有商品信息列表
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT * FROM item_info
|
||
ORDER BY updated_at DESC
|
||
''')
|
||
|
||
columns = [description[0] for description in cursor.description]
|
||
items = []
|
||
|
||
for row in cursor.fetchall():
|
||
item_info = dict(zip(columns, row))
|
||
|
||
# 解析item_detail JSON
|
||
if item_info.get('item_detail'):
|
||
try:
|
||
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
|
||
except:
|
||
item_info['item_detail_parsed'] = {}
|
||
|
||
items.append(item_info)
|
||
|
||
return items
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取所有商品信息失败: {e}")
|
||
return []
|
||
|
||
def update_item_detail(self, cookie_id: str, item_id: str, item_detail: str) -> bool:
|
||
"""更新商品详情(不覆盖商品标题等基本信息)
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
item_id: 商品ID
|
||
item_detail: 商品详情JSON字符串
|
||
|
||
Returns:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
# 只更新item_detail字段,不影响其他字段
|
||
cursor.execute('''
|
||
UPDATE item_info SET
|
||
item_detail = ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
''', (item_detail, cookie_id, item_id))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"更新商品详情成功: {item_id}")
|
||
return True
|
||
else:
|
||
logger.warning(f"未找到要更新的商品: {item_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新商品详情失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def update_item_title_only(self, cookie_id: str, item_id: str, item_title: str) -> bool:
|
||
"""仅更新商品标题(并发安全)
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
item_id: 商品ID
|
||
item_title: 商品标题
|
||
|
||
Returns:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
# 使用 INSERT OR REPLACE 确保记录存在,但只更新标题字段
|
||
cursor.execute('''
|
||
INSERT INTO item_info (cookie_id, item_id, item_title, item_description,
|
||
item_category, item_price, item_detail, created_at, updated_at)
|
||
VALUES (?, ?, ?,
|
||
COALESCE((SELECT item_description FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
|
||
COALESCE((SELECT item_category FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
|
||
COALESCE((SELECT item_price FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
|
||
COALESCE((SELECT item_detail FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
|
||
COALESCE((SELECT created_at FROM item_info WHERE cookie_id = ? AND item_id = ?), CURRENT_TIMESTAMP),
|
||
CURRENT_TIMESTAMP)
|
||
ON CONFLICT(cookie_id, item_id) DO UPDATE SET
|
||
item_title = excluded.item_title,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
''', (cookie_id, item_id, item_title,
|
||
cookie_id, item_id, cookie_id, item_id, cookie_id, item_id,
|
||
cookie_id, item_id, cookie_id, item_id))
|
||
|
||
self.conn.commit()
|
||
logger.info(f"更新商品标题成功: {item_id} - {item_title}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新商品标题失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def batch_save_item_basic_info(self, items_data: list) -> int:
|
||
"""批量保存商品基本信息(并发安全)
|
||
|
||
Args:
|
||
items_data: 商品数据列表,每个元素包含 cookie_id, item_id, item_title 等字段
|
||
|
||
Returns:
|
||
int: 成功保存的商品数量
|
||
"""
|
||
if not items_data:
|
||
return 0
|
||
|
||
success_count = 0
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 使用事务批量处理
|
||
cursor.execute('BEGIN TRANSACTION')
|
||
|
||
for item_data in items_data:
|
||
try:
|
||
cookie_id = item_data.get('cookie_id')
|
||
item_id = item_data.get('item_id')
|
||
item_title = item_data.get('item_title', '')
|
||
item_description = item_data.get('item_description', '')
|
||
item_category = item_data.get('item_category', '')
|
||
item_price = item_data.get('item_price', '')
|
||
item_detail = item_data.get('item_detail', '')
|
||
|
||
if not cookie_id or not item_id:
|
||
continue
|
||
|
||
# 使用 INSERT OR IGNORE + UPDATE 模式
|
||
cursor.execute('''
|
||
INSERT OR IGNORE INTO item_info (cookie_id, item_id, item_title, item_description,
|
||
item_category, item_price, item_detail, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||
''', (cookie_id, item_id, item_title, item_description,
|
||
item_category, item_price, item_detail))
|
||
|
||
if cursor.rowcount == 0:
|
||
# 记录已存在,进行条件更新
|
||
update_sql = '''
|
||
UPDATE item_info SET
|
||
item_title = CASE WHEN (item_title IS NULL OR item_title = '') AND ? != '' THEN ? ELSE item_title END,
|
||
item_description = CASE WHEN (item_description IS NULL OR item_description = '') AND ? != '' THEN ? ELSE item_description END,
|
||
item_category = CASE WHEN (item_category IS NULL OR item_category = '') AND ? != '' THEN ? ELSE item_category END,
|
||
item_price = CASE WHEN (item_price IS NULL OR item_price = '') AND ? != '' THEN ? ELSE item_price END,
|
||
item_detail = CASE WHEN (item_detail IS NULL OR item_detail = '' OR TRIM(item_detail) = '') AND ? != '' THEN ? ELSE item_detail END,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE cookie_id = ? AND item_id = ?
|
||
'''
|
||
self._execute_sql(cursor, update_sql, (
|
||
item_title, item_title,
|
||
item_description, item_description,
|
||
item_category, item_category,
|
||
item_price, item_price,
|
||
item_detail, item_detail,
|
||
cookie_id, item_id
|
||
))
|
||
|
||
success_count += 1
|
||
|
||
except Exception as item_e:
|
||
logger.warning(f"批量保存单个商品失败 {item_data.get('item_id', 'unknown')}: {item_e}")
|
||
continue
|
||
|
||
cursor.execute('COMMIT')
|
||
logger.info(f"批量保存商品信息完成: {success_count}/{len(items_data)} 个商品")
|
||
return success_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量保存商品信息失败: {e}")
|
||
try:
|
||
cursor.execute('ROLLBACK')
|
||
except:
|
||
pass
|
||
return success_count
|
||
|
||
def delete_item_info(self, cookie_id: str, item_id: str) -> bool:
|
||
"""删除商品信息
|
||
|
||
Args:
|
||
cookie_id: Cookie ID
|
||
item_id: 商品ID
|
||
|
||
Returns:
|
||
bool: 操作是否成功
|
||
"""
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('DELETE FROM item_info WHERE cookie_id = ? AND item_id = ?',
|
||
(cookie_id, item_id))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"删除商品信息成功: {cookie_id} - {item_id}")
|
||
return True
|
||
else:
|
||
logger.warning(f"未找到要删除的商品信息: {cookie_id} - {item_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除商品信息失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def batch_delete_item_info(self, items_to_delete: list) -> int:
|
||
"""批量删除商品信息
|
||
|
||
Args:
|
||
items_to_delete: 要删除的商品列表,每个元素包含 cookie_id 和 item_id
|
||
|
||
Returns:
|
||
int: 成功删除的商品数量
|
||
"""
|
||
if not items_to_delete:
|
||
return 0
|
||
|
||
success_count = 0
|
||
try:
|
||
with self.lock:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('BEGIN TRANSACTION')
|
||
|
||
for item_data in items_to_delete:
|
||
try:
|
||
cookie_id = item_data.get('cookie_id')
|
||
item_id = item_data.get('item_id')
|
||
|
||
if not cookie_id or not item_id:
|
||
continue
|
||
|
||
cursor.execute('DELETE FROM item_info WHERE cookie_id = ? AND item_id = ?',
|
||
(cookie_id, item_id))
|
||
|
||
if cursor.rowcount > 0:
|
||
success_count += 1
|
||
logger.debug(f"删除商品信息: {cookie_id} - {item_id}")
|
||
|
||
except Exception as item_e:
|
||
logger.warning(f"删除单个商品失败 {item_data.get('item_id', 'unknown')}: {item_e}")
|
||
continue
|
||
|
||
cursor.execute('COMMIT')
|
||
logger.info(f"批量删除商品信息完成: {success_count}/{len(items_to_delete)} 个商品")
|
||
return success_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量删除商品信息失败: {e}")
|
||
try:
|
||
cursor.execute('ROLLBACK')
|
||
except:
|
||
pass
|
||
return success_count
|
||
|
||
# ==================== 用户设置管理方法 ====================
|
||
|
||
def get_user_settings(self, user_id: int):
|
||
"""获取用户的所有设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT key, value, description, updated_at
|
||
FROM user_settings
|
||
WHERE user_id = ?
|
||
ORDER BY key
|
||
''', (user_id,))
|
||
|
||
settings = {}
|
||
for row in cursor.fetchall():
|
||
settings[row[0]] = {
|
||
'value': row[1],
|
||
'description': row[2],
|
||
'updated_at': row[3]
|
||
}
|
||
|
||
return settings
|
||
except Exception as e:
|
||
logger.error(f"获取用户设置失败: {e}")
|
||
return {}
|
||
|
||
def get_user_setting(self, user_id: int, key: str):
|
||
"""获取用户的特定设置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT value, description, updated_at
|
||
FROM user_settings
|
||
WHERE user_id = ? AND key = ?
|
||
''', (user_id, key))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'key': key,
|
||
'value': row[0],
|
||
'description': row[1],
|
||
'updated_at': row[2]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取用户设置失败: {e}")
|
||
return None
|
||
|
||
def set_user_setting(self, user_id: int, key: str, value: str, description: str = None):
|
||
"""设置用户配置"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO user_settings (user_id, key, value, description, updated_at)
|
||
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
|
||
''', (user_id, key, value, description))
|
||
|
||
self.conn.commit()
|
||
logger.info(f"用户设置更新成功: user_id={user_id}, key={key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"设置用户配置失败: {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
# ==================== 管理员专用方法 ====================
|
||
|
||
def get_all_users(self):
|
||
"""获取所有用户信息(管理员专用)"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT id, username, email, created_at, updated_at
|
||
FROM users
|
||
ORDER BY created_at DESC
|
||
''')
|
||
|
||
users = []
|
||
for row in cursor.fetchall():
|
||
users.append({
|
||
'id': row[0],
|
||
'username': row[1],
|
||
'email': row[2],
|
||
'created_at': row[3],
|
||
'updated_at': row[4]
|
||
})
|
||
|
||
return users
|
||
except Exception as e:
|
||
logger.error(f"获取所有用户失败: {e}")
|
||
return []
|
||
|
||
def get_user_by_id(self, user_id: int):
|
||
"""根据ID获取用户信息"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute('''
|
||
SELECT id, username, email, created_at, updated_at
|
||
FROM users
|
||
WHERE id = ?
|
||
''', (user_id,))
|
||
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'id': row[0],
|
||
'username': row[1],
|
||
'email': row[2],
|
||
'created_at': row[3],
|
||
'updated_at': row[4]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取用户信息失败: {e}")
|
||
return None
|
||
|
||
def delete_user_and_data(self, user_id: int):
|
||
"""删除用户及其所有相关数据"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 开始事务
|
||
cursor.execute('BEGIN TRANSACTION')
|
||
|
||
# 删除用户相关的所有数据
|
||
# 1. 删除用户设置
|
||
cursor.execute('DELETE FROM user_settings WHERE user_id = ?', (user_id,))
|
||
|
||
# 2. 删除用户的卡券
|
||
cursor.execute('DELETE FROM cards WHERE user_id = ?', (user_id,))
|
||
|
||
# 3. 删除用户的发货规则
|
||
cursor.execute('DELETE FROM delivery_rules WHERE user_id = ?', (user_id,))
|
||
|
||
# 4. 删除用户的通知渠道
|
||
cursor.execute('DELETE FROM notification_channels WHERE user_id = ?', (user_id,))
|
||
|
||
# 5. 删除用户的Cookie
|
||
cursor.execute('DELETE FROM cookies WHERE user_id = ?', (user_id,))
|
||
|
||
# 6. 删除用户的关键字
|
||
cursor.execute('DELETE FROM keywords WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
|
||
|
||
# 7. 删除用户的默认回复
|
||
cursor.execute('DELETE FROM default_replies WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
|
||
|
||
# 8. 删除用户的AI回复设置
|
||
cursor.execute('DELETE FROM ai_reply_settings WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
|
||
|
||
# 9. 删除用户的消息通知
|
||
cursor.execute('DELETE FROM message_notifications WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
|
||
|
||
# 10. 最后删除用户本身
|
||
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
|
||
|
||
# 提交事务
|
||
cursor.execute('COMMIT')
|
||
|
||
logger.info(f"用户及相关数据删除成功: user_id={user_id}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
# 回滚事务
|
||
cursor.execute('ROLLBACK')
|
||
logger.error(f"删除用户及相关数据失败: {e}")
|
||
return False
|
||
|
||
def get_table_data(self, table_name: str):
|
||
"""获取指定表的所有数据"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 获取表结构
|
||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||
columns_info = cursor.fetchall()
|
||
columns = [col[1] for col in columns_info] # 列名
|
||
|
||
# 获取表数据
|
||
cursor.execute(f"SELECT * FROM {table_name}")
|
||
rows = cursor.fetchall()
|
||
|
||
# 转换为字典列表
|
||
data = []
|
||
for row in rows:
|
||
row_dict = {}
|
||
for i, value in enumerate(row):
|
||
row_dict[columns[i]] = value
|
||
data.append(row_dict)
|
||
|
||
return data, columns
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取表数据失败: {table_name} - {e}")
|
||
return [], []
|
||
|
||
def delete_table_record(self, table_name: str, record_id: str):
|
||
"""删除指定表的指定记录"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 根据表名确定主键字段
|
||
primary_key_map = {
|
||
'users': 'id',
|
||
'cookies': 'id',
|
||
'keywords': 'id',
|
||
'default_replies': 'id',
|
||
'ai_reply_settings': 'id',
|
||
'message_notifications': 'id',
|
||
'cards': 'id',
|
||
'delivery_rules': 'id',
|
||
'notification_channels': 'id',
|
||
'user_settings': 'id',
|
||
'email_verifications': 'id',
|
||
'captcha_codes': 'id'
|
||
}
|
||
|
||
primary_key = primary_key_map.get(table_name, 'id')
|
||
|
||
# 删除记录
|
||
cursor.execute(f"DELETE FROM {table_name} WHERE {primary_key} = ?", (record_id,))
|
||
|
||
if cursor.rowcount > 0:
|
||
self.conn.commit()
|
||
logger.info(f"删除表记录成功: {table_name}.{record_id}")
|
||
return True
|
||
else:
|
||
logger.warning(f"删除表记录失败,记录不存在: {table_name}.{record_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除表记录失败: {table_name}.{record_id} - {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
def clear_table_data(self, table_name: str):
|
||
"""清空指定表的所有数据"""
|
||
with self.lock:
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
|
||
# 清空表数据
|
||
cursor.execute(f"DELETE FROM {table_name}")
|
||
|
||
# 重置自增ID(如果有的话)
|
||
cursor.execute(f"DELETE FROM sqlite_sequence WHERE name = ?", (table_name,))
|
||
|
||
self.conn.commit()
|
||
logger.info(f"清空表数据成功: {table_name}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"清空表数据失败: {table_name} - {e}")
|
||
self.conn.rollback()
|
||
return False
|
||
|
||
|
||
# 全局单例
|
||
db_manager = DBManager()
|
||
|
||
# 确保进程结束时关闭数据库连接
|
||
import atexit
|
||
atexit.register(db_manager.close) |