xianyu-auto-reply/db_manager.py
2025-07-31 23:17:33 +08:00

3295 lines
139 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import os
import threading
import hashlib
import time
import json
import random
import string
import aiohttp
import io
import base64
from PIL import Image, ImageDraw, ImageFont
from typing import List, Tuple, Dict, Optional, Any
from loguru import logger
class DBManager:
"""SQLite数据库管理持久化存储Cookie和关键字"""
def __init__(self, db_path: str = None):
"""初始化数据库连接和表结构"""
# 支持环境变量配置数据库路径
if db_path is None:
db_path = os.getenv('DB_PATH', 'xianyu_data.db')
# 确保数据目录存在并有正确权限
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
try:
os.makedirs(db_dir, mode=0o755, exist_ok=True)
logger.info(f"创建数据目录: {db_dir}")
except PermissionError as e:
logger.error(f"创建数据目录失败,权限不足: {e}")
# 尝试使用当前目录
db_path = os.path.basename(db_path)
logger.warning(f"使用当前目录作为数据库路径: {db_path}")
except Exception as e:
logger.error(f"创建数据目录失败: {e}")
raise
# 检查目录权限
if db_dir and os.path.exists(db_dir):
if not os.access(db_dir, os.W_OK):
logger.error(f"数据目录没有写权限: {db_dir}")
# 尝试使用当前目录
db_path = os.path.basename(db_path)
logger.warning(f"使用当前目录作为数据库路径: {db_path}")
self.db_path = db_path
logger.info(f"数据库路径: {self.db_path}")
self.conn = None
self.lock = threading.RLock() # 使用可重入锁保护数据库操作
# SQL日志配置 - 默认启用
self.sql_log_enabled = True # 默认启用SQL日志
self.sql_log_level = 'INFO' # 默认使用INFO级别
# 允许通过环境变量覆盖默认设置
if os.getenv('SQL_LOG_ENABLED'):
self.sql_log_enabled = os.getenv('SQL_LOG_ENABLED', 'true').lower() == 'true'
if os.getenv('SQL_LOG_LEVEL'):
self.sql_log_level = os.getenv('SQL_LOG_LEVEL', 'INFO').upper()
logger.info(f"SQL日志已启用日志级别: {self.sql_log_level}")
self.init_db()
def init_db(self):
"""初始化数据库表结构"""
try:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
cursor = self.conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建邮箱验证码表
cursor.execute('''
CREATE TABLE IF NOT EXISTS email_verifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
code TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
used BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建图形验证码表
cursor.execute('''
CREATE TABLE IF NOT EXISTS captcha_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
code TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建cookies表添加user_id字段和auto_confirm字段
cursor.execute('''
CREATE TABLE IF NOT EXISTS cookies (
id TEXT PRIMARY KEY,
value TEXT NOT NULL,
user_id INTEGER NOT NULL,
auto_confirm INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
# 创建keywords表
cursor.execute('''
CREATE TABLE IF NOT EXISTS keywords (
cookie_id TEXT,
keyword TEXT,
reply TEXT,
item_id TEXT,
PRIMARY KEY (cookie_id, keyword),
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建cookie_status表
cursor.execute('''
CREATE TABLE IF NOT EXISTS cookie_status (
cookie_id TEXT PRIMARY KEY,
enabled BOOLEAN DEFAULT TRUE,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建AI回复配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_reply_settings (
cookie_id TEXT PRIMARY KEY,
ai_enabled BOOLEAN DEFAULT FALSE,
model_name TEXT DEFAULT 'qwen-plus',
api_key TEXT,
base_url TEXT DEFAULT 'https://dashscope.aliyuncs.com/compatible-mode/v1',
max_discount_percent INTEGER DEFAULT 10,
max_discount_amount INTEGER DEFAULT 100,
max_bargain_rounds INTEGER DEFAULT 3,
custom_prompts TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建AI对话历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cookie_id TEXT NOT NULL,
chat_id TEXT NOT NULL,
user_id TEXT NOT NULL,
item_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
intent TEXT,
bargain_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies (id) ON DELETE CASCADE
)
''')
# 创建AI商品信息缓存表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_item_cache (
item_id TEXT PRIMARY KEY,
data TEXT NOT NULL,
price REAL,
description TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建卡券表
cursor.execute('''
CREATE TABLE IF NOT EXISTS cards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK (type IN ('api', 'text', 'data')),
api_config TEXT,
text_content TEXT,
data_content TEXT,
description TEXT,
enabled BOOLEAN DEFAULT TRUE,
delay_seconds INTEGER DEFAULT 0,
is_multi_spec BOOLEAN DEFAULT FALSE,
spec_name TEXT,
spec_value TEXT,
user_id INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# 检查并添加 user_id 列(用于数据库迁移)
try:
self._execute_sql(cursor, "SELECT user_id FROM cards LIMIT 1")
except sqlite3.OperationalError:
# user_id 列不存在,需要添加
logger.info("正在为 cards 表添加 user_id 列...")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN user_id INTEGER NOT NULL DEFAULT 1")
self._execute_sql(cursor, "CREATE INDEX IF NOT EXISTS idx_cards_user_id ON cards(user_id)")
logger.info("cards 表 user_id 列添加完成")
# 检查并添加 delay_seconds 列(用于自动发货延时功能)
try:
self._execute_sql(cursor, "SELECT delay_seconds FROM cards LIMIT 1")
except sqlite3.OperationalError:
# delay_seconds 列不存在,需要添加
logger.info("正在为 cards 表添加 delay_seconds 列...")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN delay_seconds INTEGER DEFAULT 0")
logger.info("cards 表 delay_seconds 列添加完成")
# 检查并添加 item_id 列用于自动回复商品ID功能
try:
self._execute_sql(cursor, "SELECT item_id FROM keywords LIMIT 1")
except sqlite3.OperationalError:
# item_id 列不存在,需要添加
logger.info("正在为 keywords 表添加 item_id 列...")
self._execute_sql(cursor, "ALTER TABLE keywords ADD COLUMN item_id TEXT")
logger.info("keywords 表 item_id 列添加完成")
# 创建商品信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS item_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cookie_id TEXT NOT NULL,
item_id TEXT NOT NULL,
item_title TEXT,
item_description TEXT,
item_category TEXT,
item_price TEXT,
item_detail TEXT,
is_multi_spec BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE,
UNIQUE(cookie_id, item_id)
)
''')
# 创建自动发货规则表
cursor.execute('''
CREATE TABLE IF NOT EXISTS delivery_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
card_id INTEGER NOT NULL,
delivery_count INTEGER DEFAULT 1,
enabled BOOLEAN DEFAULT TRUE,
description TEXT,
delivery_times INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (card_id) REFERENCES cards(id) ON DELETE CASCADE
)
''')
# 创建默认回复表
cursor.execute('''
CREATE TABLE IF NOT EXISTS default_replies (
cookie_id TEXT PRIMARY KEY,
enabled BOOLEAN DEFAULT FALSE,
reply_content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建通知渠道表
cursor.execute('''
CREATE TABLE IF NOT EXISTS notification_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK (type IN ('qq')),
config TEXT NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建消息通知配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS message_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cookie_id TEXT NOT NULL,
channel_id INTEGER NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE,
FOREIGN KEY (channel_id) REFERENCES notification_channels(id) ON DELETE CASCADE,
UNIQUE(cookie_id, channel_id)
)
''')
# 创建系统设置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建用户设置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(user_id, key)
)
''')
# 插入默认系统设置不包括管理员密码由reply_server.py初始化
cursor.execute('''
INSERT OR IGNORE INTO system_settings (key, value, description) VALUES
('theme_color', 'blue', '主题颜色')
''')
# 创建默认admin用户只在首次初始化时创建
cursor.execute('SELECT COUNT(*) FROM users WHERE username = ?', ('admin',))
admin_exists = cursor.fetchone()[0] > 0
if not admin_exists:
# 首次创建admin用户设置默认密码
default_password_hash = hashlib.sha256("admin123".encode()).hexdigest()
cursor.execute('''
INSERT INTO users (username, email, password_hash) VALUES
('admin', 'admin@localhost', ?)
''', (default_password_hash,))
logger.info("创建默认admin用户密码: admin123")
# 获取admin用户ID用于历史数据绑定
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
admin_user = cursor.fetchone()
if admin_user:
admin_user_id = admin_user[0]
# 将历史cookies数据绑定到admin用户如果user_id列不存在
try:
self._execute_sql(cursor, "SELECT user_id FROM cookies LIMIT 1")
except sqlite3.OperationalError:
# user_id列不存在需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE cookies ADD COLUMN user_id INTEGER")
self._execute_sql(cursor, "UPDATE cookies SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
else:
# user_id列存在更新NULL值
self._execute_sql(cursor, "UPDATE cookies SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
# 为cookies表添加auto_confirm字段如果不存在
try:
self._execute_sql(cursor, "SELECT auto_confirm FROM cookies LIMIT 1")
except sqlite3.OperationalError:
# auto_confirm列不存在需要添加并设置默认值
self._execute_sql(cursor, "ALTER TABLE cookies ADD COLUMN auto_confirm INTEGER DEFAULT 1")
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = 1 WHERE auto_confirm IS NULL")
else:
# auto_confirm列存在更新NULL值
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = 1 WHERE auto_confirm IS NULL")
# 为delivery_rules表添加user_id字段如果不存在
try:
self._execute_sql(cursor, "SELECT user_id FROM delivery_rules LIMIT 1")
except sqlite3.OperationalError:
# user_id列不存在需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE delivery_rules ADD COLUMN user_id INTEGER")
self._execute_sql(cursor, "UPDATE delivery_rules SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
else:
# user_id列存在更新NULL值
self._execute_sql(cursor, "UPDATE delivery_rules SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
# 为notification_channels表添加user_id字段如果不存在
try:
self._execute_sql(cursor, "SELECT user_id FROM notification_channels LIMIT 1")
except sqlite3.OperationalError:
# user_id列不存在需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE notification_channels ADD COLUMN user_id INTEGER")
self._execute_sql(cursor, "UPDATE notification_channels SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
else:
# user_id列存在更新NULL值
self._execute_sql(cursor, "UPDATE notification_channels SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
# 为email_verifications表添加type字段如果不存在
try:
self._execute_sql(cursor, "SELECT type FROM email_verifications LIMIT 1")
except sqlite3.OperationalError:
# type列不存在需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE email_verifications ADD COLUMN type TEXT DEFAULT 'register'")
self._execute_sql(cursor, "UPDATE email_verifications SET type = 'register' WHERE type IS NULL")
else:
# type列存在更新NULL值
self._execute_sql(cursor, "UPDATE email_verifications SET type = 'register' WHERE type IS NULL")
# 为cards表添加多规格字段如果不存在
try:
self._execute_sql(cursor, "SELECT is_multi_spec FROM cards LIMIT 1")
except sqlite3.OperationalError:
# 多规格字段不存在,需要添加
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN is_multi_spec BOOLEAN DEFAULT FALSE")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN spec_name TEXT")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN spec_value TEXT")
logger.info("为cards表添加多规格字段")
# 为item_info表添加多规格字段如果不存在
try:
self._execute_sql(cursor, "SELECT is_multi_spec FROM item_info LIMIT 1")
except sqlite3.OperationalError:
# 多规格字段不存在,需要添加
self._execute_sql(cursor, "ALTER TABLE item_info ADD COLUMN is_multi_spec BOOLEAN DEFAULT FALSE")
logger.info("为item_info表添加多规格字段")
self.conn.commit()
logger.info(f"数据库初始化成功: {self.db_path}")
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
if self.conn:
self.conn.close()
raise
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
self.conn = None
def get_connection(self):
"""获取数据库连接,如果已关闭则重新连接"""
if self.conn is None:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
return self.conn
def _log_sql(self, sql: str, params: tuple = None, operation: str = "EXECUTE"):
"""记录SQL执行日志"""
if not self.sql_log_enabled:
return
# 格式化参数
params_str = ""
if params:
if isinstance(params, (list, tuple)):
if len(params) > 0:
# 限制参数长度,避免日志过长
formatted_params = []
for param in params:
if isinstance(param, str) and len(param) > 100:
formatted_params.append(f"{param[:100]}...")
else:
formatted_params.append(repr(param))
params_str = f" | 参数: [{', '.join(formatted_params)}]"
# 格式化SQL移除多余空白
formatted_sql = ' '.join(sql.split())
# 根据配置的日志级别输出
log_message = f"🗄️ SQL {operation}: {formatted_sql}{params_str}"
if self.sql_log_level == 'DEBUG':
logger.debug(log_message)
elif self.sql_log_level == 'INFO':
logger.info(log_message)
elif self.sql_log_level == 'WARNING':
logger.warning(log_message)
else:
logger.debug(log_message)
def _execute_sql(self, cursor, sql: str, params: tuple = None):
"""执行SQL并记录日志"""
self._log_sql(sql, params, "EXECUTE")
if params:
return cursor.execute(sql, params)
else:
return cursor.execute(sql)
def _executemany_sql(self, cursor, sql: str, params_list):
"""批量执行SQL并记录日志"""
self._log_sql(sql, f"批量执行 {len(params_list)} 条记录", "EXECUTEMANY")
return cursor.executemany(sql, params_list)
# -------------------- Cookie操作 --------------------
def save_cookie(self, cookie_id: str, cookie_value: str, user_id: int = None) -> bool:
"""保存Cookie到数据库如存在则更新"""
with self.lock:
try:
cursor = self.conn.cursor()
# 如果没有提供user_id尝试从现有记录获取否则使用admin用户ID
if user_id is None:
self._execute_sql(cursor, "SELECT user_id FROM cookies WHERE id = ?", (cookie_id,))
existing = cursor.fetchone()
if existing:
user_id = existing[0]
else:
# 获取admin用户ID作为默认值
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
admin_user = cursor.fetchone()
user_id = admin_user[0] if admin_user else 1
self._execute_sql(cursor,
"INSERT OR REPLACE INTO cookies (id, value, user_id) VALUES (?, ?, ?)",
(cookie_id, cookie_value, user_id)
)
self.conn.commit()
logger.info(f"Cookie保存成功: {cookie_id} (用户ID: {user_id})")
# 验证保存结果
self._execute_sql(cursor, "SELECT user_id FROM cookies WHERE id = ?", (cookie_id,))
saved_user_id = cursor.fetchone()
if saved_user_id:
logger.info(f"Cookie保存验证: {cookie_id} 实际绑定到用户ID: {saved_user_id[0]}")
else:
logger.error(f"Cookie保存验证失败: {cookie_id} 未找到记录")
return True
except Exception as e:
logger.error(f"Cookie保存失败: {e}")
self.conn.rollback()
return False
def delete_cookie(self, cookie_id: str) -> bool:
"""从数据库删除Cookie及其关键字"""
with self.lock:
try:
cursor = self.conn.cursor()
# 删除关联的关键字
self._execute_sql(cursor, "DELETE FROM keywords WHERE cookie_id = ?", (cookie_id,))
# 删除Cookie
self._execute_sql(cursor, "DELETE FROM cookies WHERE id = ?", (cookie_id,))
self.conn.commit()
logger.debug(f"Cookie删除成功: {cookie_id}")
return True
except Exception as e:
logger.error(f"Cookie删除失败: {e}")
self.conn.rollback()
return False
def get_cookie(self, cookie_id: str) -> Optional[str]:
"""获取指定Cookie值"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT value FROM cookies WHERE id = ?", (cookie_id,))
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
logger.error(f"获取Cookie失败: {e}")
return None
def get_all_cookies(self, user_id: int = None) -> Dict[str, str]:
"""获取所有Cookie支持用户隔离"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
self._execute_sql(cursor, "SELECT id, value FROM cookies WHERE user_id = ?", (user_id,))
else:
self._execute_sql(cursor, "SELECT id, value FROM cookies")
return {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logger.error(f"获取所有Cookie失败: {e}")
return {}
def get_cookie_by_id(self, cookie_id: str) -> Optional[Dict[str, str]]:
"""根据ID获取Cookie信息
Args:
cookie_id: Cookie ID
Returns:
Dict包含cookie信息包括cookies_str字段如果不存在返回None
"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT id, value, created_at FROM cookies WHERE id = ?", (cookie_id,))
result = cursor.fetchone()
if result:
return {
'id': result[0],
'cookies_str': result[1], # 使用cookies_str字段名以匹配调用方期望
'value': result[1], # 保持向后兼容
'created_at': result[2]
}
return None
except Exception as e:
logger.error(f"根据ID获取Cookie失败: {e}")
return None
def get_cookie_details(self, cookie_id: str) -> Optional[Dict[str, any]]:
"""获取Cookie的详细信息包括user_id和auto_confirm"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT id, value, user_id, auto_confirm, created_at FROM cookies WHERE id = ?", (cookie_id,))
result = cursor.fetchone()
if result:
return {
'id': result[0],
'value': result[1],
'user_id': result[2],
'auto_confirm': bool(result[3]),
'created_at': result[4]
}
return None
except Exception as e:
logger.error(f"获取Cookie详细信息失败: {e}")
return None
def update_auto_confirm(self, cookie_id: str, auto_confirm: bool) -> bool:
"""更新Cookie的自动确认发货设置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = ? WHERE id = ?", (int(auto_confirm), cookie_id))
self.conn.commit()
logger.info(f"更新账号 {cookie_id} 自动确认发货设置: {'开启' if auto_confirm else '关闭'}")
return True
except Exception as e:
logger.error(f"更新自动确认发货设置失败: {e}")
return False
def get_auto_confirm(self, cookie_id: str) -> bool:
"""获取Cookie的自动确认发货设置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT auto_confirm FROM cookies WHERE id = ?", (cookie_id,))
result = cursor.fetchone()
if result:
return bool(result[0])
return True # 默认开启
except Exception as e:
logger.error(f"获取自动确认发货设置失败: {e}")
return True # 出错时默认开启
# -------------------- 关键字操作 --------------------
def save_keywords(self, cookie_id: str, keywords: List[Tuple[str, str]]) -> bool:
"""保存关键字列表,先删除旧数据再插入新数据(向后兼容方法)"""
# 转换为新格式不包含item_id
keywords_with_item_id = [(keyword, reply, None) for keyword, reply in keywords]
return self.save_keywords_with_item_id(cookie_id, keywords_with_item_id)
def save_keywords_with_item_id(self, cookie_id: str, keywords: List[Tuple[str, str, str]]) -> bool:
"""保存关键字列表包含商品ID先删除旧数据再插入新数据"""
with self.lock:
try:
cursor = self.conn.cursor()
# 先删除该cookie_id的所有关键字
self._execute_sql(cursor, "DELETE FROM keywords WHERE cookie_id = ?", (cookie_id,))
# 插入新关键字
for keyword, reply, item_id in keywords:
self._execute_sql(cursor,
"INSERT INTO keywords (cookie_id, keyword, reply, item_id) VALUES (?, ?, ?, ?)",
(cookie_id, keyword, reply, item_id))
self.conn.commit()
logger.info(f"关键字保存成功: {cookie_id}, {len(keywords)}")
return True
except Exception as e:
logger.error(f"关键字保存失败: {e}")
self.conn.rollback()
return False
def get_keywords(self, cookie_id: str) -> List[Tuple[str, str]]:
"""获取指定Cookie的关键字列表向后兼容方法"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT keyword, reply FROM keywords WHERE cookie_id = ?", (cookie_id,))
return [(row[0], row[1]) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取关键字失败: {e}")
return []
def get_keywords_with_item_id(self, cookie_id: str) -> List[Tuple[str, str, str]]:
"""获取指定Cookie的关键字列表包含商品ID"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT keyword, reply, item_id FROM keywords WHERE cookie_id = ?", (cookie_id,))
return [(row[0], row[1], row[2]) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取关键字失败: {e}")
return []
def check_keyword_duplicate(self, cookie_id: str, keyword: str, item_id: str = None) -> bool:
"""检查关键词是否重复"""
with self.lock:
try:
cursor = self.conn.cursor()
if item_id:
# 如果有商品ID检查相同cookie_id、keyword、item_id的组合
self._execute_sql(cursor,
"SELECT COUNT(*) FROM keywords WHERE cookie_id = ? AND keyword = ? AND item_id = ?",
(cookie_id, keyword, item_id))
else:
# 如果没有商品ID检查相同cookie_id、keyword且item_id为空的组合
self._execute_sql(cursor,
"SELECT COUNT(*) FROM keywords WHERE cookie_id = ? AND keyword = ? AND (item_id IS NULL OR item_id = '')",
(cookie_id, keyword))
count = cursor.fetchone()[0]
return count > 0
except Exception as e:
logger.error(f"检查关键词重复失败: {e}")
return False
def get_all_keywords(self, user_id: int = None) -> Dict[str, List[Tuple[str, str]]]:
"""获取所有Cookie的关键字支持用户隔离"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
cursor.execute("""
SELECT k.cookie_id, k.keyword, k.reply
FROM keywords k
JOIN cookies c ON k.cookie_id = c.id
WHERE c.user_id = ?
""", (user_id,))
else:
self._execute_sql(cursor, "SELECT cookie_id, keyword, reply FROM keywords")
result = {}
for row in cursor.fetchall():
cookie_id, keyword, reply = row
if cookie_id not in result:
result[cookie_id] = []
result[cookie_id].append((keyword, reply))
return result
except Exception as e:
logger.error(f"获取所有关键字失败: {e}")
return {}
def save_cookie_status(self, cookie_id: str, enabled: bool):
"""保存Cookie的启用状态"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO cookie_status (cookie_id, enabled, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
''', (cookie_id, enabled))
self.conn.commit()
logger.debug(f"保存Cookie状态: {cookie_id} -> {'启用' if enabled else '禁用'}")
except Exception as e:
logger.error(f"保存Cookie状态失败: {e}")
raise
def get_cookie_status(self, cookie_id: str) -> bool:
"""获取Cookie的启用状态"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('SELECT enabled FROM cookie_status WHERE cookie_id = ?', (cookie_id,))
result = cursor.fetchone()
return bool(result[0]) if result else True # 默认启用
except Exception as e:
logger.error(f"获取Cookie状态失败: {e}")
return True # 出错时默认启用
def get_all_cookie_status(self) -> Dict[str, bool]:
"""获取所有Cookie的启用状态"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('SELECT cookie_id, enabled FROM cookie_status')
result = {}
for row in cursor.fetchall():
cookie_id, enabled = row
result[cookie_id] = bool(enabled)
return result
except Exception as e:
logger.error(f"获取所有Cookie状态失败: {e}")
return {}
# -------------------- AI回复设置操作 --------------------
def save_ai_reply_settings(self, cookie_id: str, settings: dict) -> bool:
"""保存AI回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO ai_reply_settings
(cookie_id, ai_enabled, model_name, api_key, base_url,
max_discount_percent, max_discount_amount, max_bargain_rounds,
custom_prompts, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
cookie_id,
settings.get('ai_enabled', False),
settings.get('model_name', 'qwen-plus'),
settings.get('api_key', ''),
settings.get('base_url', 'https://dashscope.aliyuncs.com/compatible-mode/v1'),
settings.get('max_discount_percent', 10),
settings.get('max_discount_amount', 100),
settings.get('max_bargain_rounds', 3),
settings.get('custom_prompts', '')
))
self.conn.commit()
logger.debug(f"AI回复设置保存成功: {cookie_id}")
return True
except Exception as e:
logger.error(f"保存AI回复设置失败: {e}")
self.conn.rollback()
return False
def get_ai_reply_settings(self, cookie_id: str) -> dict:
"""获取AI回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT ai_enabled, model_name, api_key, base_url,
max_discount_percent, max_discount_amount, max_bargain_rounds,
custom_prompts
FROM ai_reply_settings WHERE cookie_id = ?
''', (cookie_id,))
result = cursor.fetchone()
if result:
return {
'ai_enabled': bool(result[0]),
'model_name': result[1],
'api_key': result[2],
'base_url': result[3],
'max_discount_percent': result[4],
'max_discount_amount': result[5],
'max_bargain_rounds': result[6],
'custom_prompts': result[7]
}
else:
# 返回默认设置
return {
'ai_enabled': False,
'model_name': 'qwen-plus',
'api_key': '',
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'max_discount_percent': 10,
'max_discount_amount': 100,
'max_bargain_rounds': 3,
'custom_prompts': ''
}
except Exception as e:
logger.error(f"获取AI回复设置失败: {e}")
return {
'ai_enabled': False,
'model_name': 'qwen-plus',
'api_key': '',
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'max_discount_percent': 10,
'max_discount_amount': 100,
'max_bargain_rounds': 3,
'custom_prompts': ''
}
def get_all_ai_reply_settings(self) -> Dict[str, dict]:
"""获取所有账号的AI回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT cookie_id, ai_enabled, model_name, api_key, base_url,
max_discount_percent, max_discount_amount, max_bargain_rounds,
custom_prompts
FROM ai_reply_settings
''')
result = {}
for row in cursor.fetchall():
cookie_id = row[0]
result[cookie_id] = {
'ai_enabled': bool(row[1]),
'model_name': row[2],
'api_key': row[3],
'base_url': row[4],
'max_discount_percent': row[5],
'max_discount_amount': row[6],
'max_bargain_rounds': row[7],
'custom_prompts': row[8]
}
return result
except Exception as e:
logger.error(f"获取所有AI回复设置失败: {e}")
return {}
# -------------------- 默认回复操作 --------------------
def save_default_reply(self, cookie_id: str, enabled: bool, reply_content: str = None):
"""保存默认回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO default_replies (cookie_id, enabled, reply_content, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
''', (cookie_id, enabled, reply_content))
self.conn.commit()
logger.debug(f"保存默认回复设置: {cookie_id} -> {'启用' if enabled else '禁用'}")
except Exception as e:
logger.error(f"保存默认回复设置失败: {e}")
raise
def get_default_reply(self, cookie_id: str) -> Optional[Dict[str, any]]:
"""获取指定账号的默认回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT enabled, reply_content FROM default_replies WHERE cookie_id = ?
''', (cookie_id,))
result = cursor.fetchone()
if result:
enabled, reply_content = result
return {
'enabled': bool(enabled),
'reply_content': reply_content or ''
}
return None
except Exception as e:
logger.error(f"获取默认回复设置失败: {e}")
return None
def get_all_default_replies(self) -> Dict[str, Dict[str, any]]:
"""获取所有账号的默认回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('SELECT cookie_id, enabled, reply_content FROM default_replies')
result = {}
for row in cursor.fetchall():
cookie_id, enabled, reply_content = row
result[cookie_id] = {
'enabled': bool(enabled),
'reply_content': reply_content or ''
}
return result
except Exception as e:
logger.error(f"获取所有默认回复设置失败: {e}")
return {}
def delete_default_reply(self, cookie_id: str) -> bool:
"""删除指定账号的默认回复设置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "DELETE FROM default_replies WHERE cookie_id = ?", (cookie_id,))
self.conn.commit()
logger.debug(f"删除默认回复设置: {cookie_id}")
return True
except Exception as e:
logger.error(f"删除默认回复设置失败: {e}")
self.conn.rollback()
return False
# -------------------- 通知渠道操作 --------------------
def create_notification_channel(self, name: str, channel_type: str, config: str, user_id: int = None) -> int:
"""创建通知渠道"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO notification_channels (name, type, config, user_id)
VALUES (?, ?, ?, ?)
''', (name, channel_type, config, user_id))
self.conn.commit()
channel_id = cursor.lastrowid
logger.debug(f"创建通知渠道: {name} (ID: {channel_id})")
return channel_id
except Exception as e:
logger.error(f"创建通知渠道失败: {e}")
self.conn.rollback()
raise
def get_notification_channels(self, user_id: int = None) -> List[Dict[str, any]]:
"""获取所有通知渠道"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
cursor.execute('''
SELECT id, name, type, config, enabled, created_at, updated_at
FROM notification_channels
WHERE user_id = ?
ORDER BY created_at DESC
''', (user_id,))
else:
cursor.execute('''
SELECT id, name, type, config, enabled, created_at, updated_at
FROM notification_channels
ORDER BY created_at DESC
''')
channels = []
for row in cursor.fetchall():
channels.append({
'id': row[0],
'name': row[1],
'type': row[2],
'config': row[3],
'enabled': bool(row[4]),
'created_at': row[5],
'updated_at': row[6]
})
return channels
except Exception as e:
logger.error(f"获取通知渠道失败: {e}")
return []
def get_notification_channel(self, channel_id: int) -> Optional[Dict[str, any]]:
"""获取指定通知渠道"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, name, type, config, enabled, created_at, updated_at
FROM notification_channels WHERE id = ?
''', (channel_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'name': row[1],
'type': row[2],
'config': row[3],
'enabled': bool(row[4]),
'created_at': row[5],
'updated_at': row[6]
}
return None
except Exception as e:
logger.error(f"获取通知渠道失败: {e}")
return None
def update_notification_channel(self, channel_id: int, name: str, config: str, enabled: bool = True) -> bool:
"""更新通知渠道"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
UPDATE notification_channels
SET name = ?, config = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (name, config, enabled, channel_id))
self.conn.commit()
logger.debug(f"更新通知渠道: {channel_id}")
return cursor.rowcount > 0
except Exception as e:
logger.error(f"更新通知渠道失败: {e}")
self.conn.rollback()
return False
def delete_notification_channel(self, channel_id: int) -> bool:
"""删除通知渠道"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "DELETE FROM notification_channels WHERE id = ?", (channel_id,))
self.conn.commit()
logger.debug(f"删除通知渠道: {channel_id}")
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除通知渠道失败: {e}")
self.conn.rollback()
return False
# -------------------- 消息通知配置操作 --------------------
def set_message_notification(self, cookie_id: str, channel_id: int, enabled: bool = True) -> bool:
"""设置账号的消息通知"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO message_notifications (cookie_id, channel_id, enabled)
VALUES (?, ?, ?)
''', (cookie_id, channel_id, enabled))
self.conn.commit()
logger.debug(f"设置消息通知: {cookie_id} -> {channel_id}")
return True
except Exception as e:
logger.error(f"设置消息通知失败: {e}")
self.conn.rollback()
return False
def get_account_notifications(self, cookie_id: str) -> List[Dict[str, any]]:
"""获取账号的通知配置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT mn.id, mn.channel_id, mn.enabled, nc.name, nc.type, nc.config
FROM message_notifications mn
JOIN notification_channels nc ON mn.channel_id = nc.id
WHERE mn.cookie_id = ? AND nc.enabled = 1
ORDER BY mn.id
''', (cookie_id,))
notifications = []
for row in cursor.fetchall():
notifications.append({
'id': row[0],
'channel_id': row[1],
'enabled': bool(row[2]),
'channel_name': row[3],
'channel_type': row[4],
'channel_config': row[5]
})
return notifications
except Exception as e:
logger.error(f"获取账号通知配置失败: {e}")
return []
def get_all_message_notifications(self) -> Dict[str, List[Dict[str, any]]]:
"""获取所有账号的通知配置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT mn.cookie_id, mn.id, mn.channel_id, mn.enabled, nc.name, nc.type, nc.config
FROM message_notifications mn
JOIN notification_channels nc ON mn.channel_id = nc.id
WHERE nc.enabled = 1
ORDER BY mn.cookie_id, mn.id
''')
result = {}
for row in cursor.fetchall():
cookie_id = row[0]
if cookie_id not in result:
result[cookie_id] = []
result[cookie_id].append({
'id': row[1],
'channel_id': row[2],
'enabled': bool(row[3]),
'channel_name': row[4],
'channel_type': row[5],
'channel_config': row[6]
})
return result
except Exception as e:
logger.error(f"获取所有消息通知配置失败: {e}")
return {}
def delete_message_notification(self, notification_id: int) -> bool:
"""删除消息通知配置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "DELETE FROM message_notifications WHERE id = ?", (notification_id,))
self.conn.commit()
logger.debug(f"删除消息通知配置: {notification_id}")
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除消息通知配置失败: {e}")
self.conn.rollback()
return False
def delete_account_notifications(self, cookie_id: str) -> bool:
"""删除账号的所有消息通知配置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "DELETE FROM message_notifications WHERE cookie_id = ?", (cookie_id,))
self.conn.commit()
logger.debug(f"删除账号通知配置: {cookie_id}")
return cursor.rowcount > 0
except Exception as e:
logger.error(f"删除账号通知配置失败: {e}")
self.conn.rollback()
return False
# -------------------- 备份和恢复操作 --------------------
def export_backup(self, user_id: int = None) -> Dict[str, any]:
"""导出系统备份数据(支持用户隔离)"""
with self.lock:
try:
cursor = self.conn.cursor()
backup_data = {
'version': '1.0',
'timestamp': time.time(),
'user_id': user_id,
'data': {}
}
if user_id is not None:
# 用户级备份:只备份该用户的数据
# 备份用户的cookies
self._execute_sql(cursor, "SELECT * FROM cookies WHERE user_id = ?", (user_id,))
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
backup_data['data']['cookies'] = {
'columns': columns,
'rows': [list(row) for row in rows]
}
# 备份用户cookies相关的其他数据
user_cookie_ids = [row[0] for row in rows] # 获取用户的cookie_id列表
if user_cookie_ids:
placeholders = ','.join(['?' for _ in user_cookie_ids])
# 备份关键字
cursor.execute(f"SELECT * FROM keywords WHERE cookie_id IN ({placeholders})", user_cookie_ids)
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
backup_data['data']['keywords'] = {
'columns': columns,
'rows': [list(row) for row in rows]
}
# 备份其他相关表
related_tables = ['cookie_status', 'default_replies', 'message_notifications',
'item_info', 'ai_reply_settings', 'ai_conversations']
for table in related_tables:
cursor.execute(f"SELECT * FROM {table} WHERE cookie_id IN ({placeholders})", user_cookie_ids)
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
backup_data['data'][table] = {
'columns': columns,
'rows': [list(row) for row in rows]
}
else:
# 系统级备份:备份所有数据
tables = [
'cookies', 'keywords', 'cookie_status', 'cards',
'delivery_rules', 'default_replies', 'notification_channels',
'message_notifications', 'system_settings', 'item_info',
'ai_reply_settings', 'ai_conversations', 'ai_item_cache'
]
for table in tables:
cursor.execute(f"SELECT * FROM {table}")
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
backup_data['data'][table] = {
'columns': columns,
'rows': [list(row) for row in rows]
}
logger.info(f"导出备份成功用户ID: {user_id}")
return backup_data
except Exception as e:
logger.error(f"导出备份失败: {e}")
raise
def import_backup(self, backup_data: Dict[str, any], user_id: int = None) -> bool:
"""导入系统备份数据(支持用户隔离)"""
with self.lock:
try:
# 验证备份数据格式
if not isinstance(backup_data, dict) or 'data' not in backup_data:
raise ValueError("备份数据格式无效")
# 开始事务
cursor = self.conn.cursor()
self._execute_sql(cursor, "BEGIN TRANSACTION")
if user_id is not None:
# 用户级导入:只清空该用户的数据
# 获取用户的cookie_id列表
self._execute_sql(cursor, "SELECT id FROM cookies WHERE user_id = ?", (user_id,))
user_cookie_ids = [row[0] for row in cursor.fetchall()]
if user_cookie_ids:
placeholders = ','.join(['?' for _ in user_cookie_ids])
# 删除用户相关数据
related_tables = ['message_notifications', 'default_replies', 'item_info',
'cookie_status', 'keywords', 'ai_conversations', 'ai_reply_settings']
for table in related_tables:
cursor.execute(f"DELETE FROM {table} WHERE cookie_id IN ({placeholders})", user_cookie_ids)
# 删除用户的cookies
self._execute_sql(cursor, "DELETE FROM cookies WHERE user_id = ?", (user_id,))
else:
# 系统级导入:清空所有数据(除了用户和管理员密码)
tables = [
'message_notifications', 'notification_channels', 'default_replies',
'delivery_rules', 'cards', 'item_info', 'cookie_status', 'keywords',
'ai_conversations', 'ai_reply_settings', 'ai_item_cache', 'cookies'
]
for table in tables:
cursor.execute(f"DELETE FROM {table}")
# 清空系统设置(保留管理员密码)
self._execute_sql(cursor, "DELETE FROM system_settings WHERE key != 'admin_password_hash'")
# 导入数据
data = backup_data['data']
for table_name, table_data in data.items():
if table_name not in ['cookies', 'keywords', 'cookie_status', 'cards',
'delivery_rules', 'default_replies', 'notification_channels',
'message_notifications', 'system_settings', 'item_info',
'ai_reply_settings', 'ai_conversations', 'ai_item_cache']:
continue
columns = table_data['columns']
rows = table_data['rows']
if not rows:
continue
# 如果是用户级导入需要确保cookies表的user_id正确
if user_id is not None and table_name == 'cookies':
# 更新所有导入的cookies的user_id
updated_rows = []
for row in rows:
row_dict = dict(zip(columns, row))
row_dict['user_id'] = user_id
updated_rows.append([row_dict[col] for col in columns])
rows = updated_rows
# 构建插入语句
placeholders = ','.join(['?' for _ in columns])
if table_name == 'system_settings':
# 系统设置需要特殊处理,避免覆盖管理员密码
for row in rows:
if len(row) >= 1 and row[0] != 'admin_password_hash':
cursor.execute(f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})", row)
else:
cursor.executemany(f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})", rows)
# 提交事务
self.conn.commit()
logger.info("导入备份成功")
return True
except Exception as e:
logger.error(f"导入备份失败: {e}")
self.conn.rollback()
return False
# -------------------- 系统设置操作 --------------------
def get_system_setting(self, key: str) -> Optional[str]:
"""获取系统设置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT value FROM system_settings WHERE key = ?", (key,))
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
logger.error(f"获取系统设置失败: {e}")
return None
def set_system_setting(self, key: str, value: str, description: str = None) -> bool:
"""设置系统设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO system_settings (key, value, description, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
''', (key, value, description))
self.conn.commit()
logger.debug(f"设置系统设置: {key}")
return True
except Exception as e:
logger.error(f"设置系统设置失败: {e}")
self.conn.rollback()
return False
def get_all_system_settings(self) -> Dict[str, str]:
"""获取所有系统设置"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "SELECT key, value FROM system_settings")
settings = {}
for row in cursor.fetchall():
settings[row[0]] = row[1]
return settings
except Exception as e:
logger.error(f"获取所有系统设置失败: {e}")
return {}
# 管理员密码现在统一使用用户表管理,不再需要单独的方法
# ==================== 用户管理方法 ====================
def create_user(self, username: str, email: str, password: str) -> bool:
"""创建新用户"""
with self.lock:
try:
cursor = self.conn.cursor()
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (username, email, password_hash))
self.conn.commit()
logger.info(f"创建用户成功: {username} ({email})")
return True
except sqlite3.IntegrityError as e:
logger.error(f"创建用户失败,用户名或邮箱已存在: {e}")
self.conn.rollback()
return False
except Exception as e:
logger.error(f"创建用户失败: {e}")
self.conn.rollback()
return False
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""根据用户名获取用户信息"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, username, email, password_hash, is_active, created_at, updated_at
FROM users WHERE username = ?
''', (username,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'username': row[1],
'email': row[2],
'password_hash': row[3],
'is_active': row[4],
'created_at': row[5],
'updated_at': row[6]
}
return None
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
"""根据邮箱获取用户信息"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, username, email, password_hash, is_active, created_at, updated_at
FROM users WHERE email = ?
''', (email,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'username': row[1],
'email': row[2],
'password_hash': row[3],
'is_active': row[4],
'created_at': row[5],
'updated_at': row[6]
}
return None
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def verify_user_password(self, username: str, password: str) -> bool:
"""验证用户密码"""
user = self.get_user_by_username(username)
if not user:
return False
password_hash = hashlib.sha256(password.encode()).hexdigest()
return user['password_hash'] == password_hash and user['is_active']
def update_user_password(self, username: str, new_password: str) -> bool:
"""更新用户密码"""
with self.lock:
try:
cursor = self.conn.cursor()
password_hash = hashlib.sha256(new_password.encode()).hexdigest()
cursor.execute('''
UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP
WHERE username = ?
''', (password_hash, username))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"用户 {username} 密码更新成功")
return True
else:
logger.warning(f"用户 {username} 不存在,密码更新失败")
return False
except Exception as e:
logger.error(f"更新用户密码失败: {e}")
self.conn.rollback()
return False
def generate_verification_code(self) -> str:
"""生成6位数字验证码"""
return ''.join(random.choices(string.digits, k=6))
def generate_captcha(self) -> Tuple[str, str]:
"""生成图形验证码
返回: (验证码文本, base64编码的图片)
"""
try:
# 生成4位随机验证码数字+字母)
chars = string.ascii_uppercase + string.digits
captcha_text = ''.join(random.choices(chars, k=4))
# 创建图片
width, height = 120, 40
image = Image.new('RGB', (width, height), color='white')
draw = ImageDraw.Draw(image)
# 尝试使用系统字体,如果失败则使用默认字体
try:
# Windows系统字体
font = ImageFont.truetype("arial.ttf", 20)
except:
try:
# 备用字体
font = ImageFont.truetype("C:/Windows/Fonts/arial.ttf", 20)
except:
# 使用默认字体
font = ImageFont.load_default()
# 绘制验证码文本
for i, char in enumerate(captcha_text):
# 随机颜色
color = (
random.randint(0, 100),
random.randint(0, 100),
random.randint(0, 100)
)
# 随机位置(稍微偏移)
x = 20 + i * 20 + random.randint(-3, 3)
y = 8 + random.randint(-3, 3)
draw.text((x, y), char, font=font, fill=color)
# 添加干扰线
for _ in range(3):
start = (random.randint(0, width), random.randint(0, height))
end = (random.randint(0, width), random.randint(0, height))
draw.line([start, end], fill=(random.randint(100, 200), random.randint(100, 200), random.randint(100, 200)), width=1)
# 添加干扰点
for _ in range(20):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
# 转换为base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return captcha_text, f"data:image/png;base64,{img_base64}"
except Exception as e:
logger.error(f"生成图形验证码失败: {e}")
# 返回简单的文本验证码作为备用
simple_code = ''.join(random.choices(string.digits, k=4))
return simple_code, ""
def save_captcha(self, session_id: str, captcha_text: str, expires_minutes: int = 5) -> bool:
"""保存图形验证码"""
with self.lock:
try:
cursor = self.conn.cursor()
expires_at = time.time() + (expires_minutes * 60)
# 删除该session的旧验证码
cursor.execute('DELETE FROM captcha_codes WHERE session_id = ?', (session_id,))
cursor.execute('''
INSERT INTO captcha_codes (session_id, code, expires_at)
VALUES (?, ?, ?)
''', (session_id, captcha_text.upper(), expires_at))
self.conn.commit()
logger.debug(f"保存图形验证码成功: {session_id}")
return True
except Exception as e:
logger.error(f"保存图形验证码失败: {e}")
self.conn.rollback()
return False
def verify_captcha(self, session_id: str, user_input: str) -> bool:
"""验证图形验证码"""
with self.lock:
try:
cursor = self.conn.cursor()
current_time = time.time()
# 查找有效的验证码
cursor.execute('''
SELECT id FROM captcha_codes
WHERE session_id = ? AND code = ? AND expires_at > ?
ORDER BY created_at DESC LIMIT 1
''', (session_id, user_input.upper(), current_time))
row = cursor.fetchone()
if row:
# 删除已使用的验证码
cursor.execute('DELETE FROM captcha_codes WHERE id = ?', (row[0],))
self.conn.commit()
logger.debug(f"图形验证码验证成功: {session_id}")
return True
else:
logger.warning(f"图形验证码验证失败: {session_id} - {user_input}")
return False
except Exception as e:
logger.error(f"验证图形验证码失败: {e}")
return False
def save_verification_code(self, email: str, code: str, code_type: str = 'register', expires_minutes: int = 10) -> bool:
"""保存邮箱验证码"""
with self.lock:
try:
cursor = self.conn.cursor()
expires_at = time.time() + (expires_minutes * 60)
cursor.execute('''
INSERT INTO email_verifications (email, code, type, expires_at)
VALUES (?, ?, ?, ?)
''', (email, code, code_type, expires_at))
self.conn.commit()
logger.info(f"保存验证码成功: {email} ({code_type})")
return True
except Exception as e:
logger.error(f"保存验证码失败: {e}")
self.conn.rollback()
return False
def verify_email_code(self, email: str, code: str, code_type: str = 'register') -> bool:
"""验证邮箱验证码"""
with self.lock:
try:
cursor = self.conn.cursor()
current_time = time.time()
# 查找有效的验证码
cursor.execute('''
SELECT id FROM email_verifications
WHERE email = ? AND code = ? AND type = ? AND expires_at > ? AND used = FALSE
ORDER BY created_at DESC LIMIT 1
''', (email, code, code_type, current_time))
row = cursor.fetchone()
if row:
# 标记验证码为已使用
cursor.execute('''
UPDATE email_verifications SET used = TRUE WHERE id = ?
''', (row[0],))
self.conn.commit()
logger.info(f"验证码验证成功: {email} ({code_type})")
return True
else:
logger.warning(f"验证码验证失败: {email} - {code} ({code_type})")
return False
except Exception as e:
logger.error(f"验证邮箱验证码失败: {e}")
return False
async def send_verification_email(self, email: str, code: str) -> bool:
"""发送验证码邮件"""
try:
subject = "闲鱼自动回复系统 - 邮箱验证码"
# 使用简单的纯文本邮件内容
text_content = f"""【闲鱼自动回复系统】邮箱验证码
您好!
感谢您使用闲鱼自动回复系统。为了确保账户安全,请使用以下验证码完成邮箱验证:
验证码:{code}
重要提醒:
• 验证码有效期为 10 分钟,请及时使用
• 请勿将验证码分享给任何人
• 如非本人操作,请忽略此邮件
• 系统不会主动索要您的验证码
如果您在使用过程中遇到任何问题,请联系我们的技术支持团队。
感谢您选择闲鱼自动回复系统!
---
此邮件由系统自动发送,请勿直接回复
© 2025 闲鱼自动回复系统"""
# 使用GET请求发送邮件
api_url = "https://dy.zhinianboke.com/api/emailSend"
params = {
'subject': subject,
'receiveUser': email,
'sendHtml': text_content
}
async with aiohttp.ClientSession() as session:
try:
logger.info(f"发送验证码邮件: {email}")
async with session.get(api_url, params=params, timeout=15) as response:
response_text = await response.text()
logger.info(f"邮件API响应: {response.status}")
if response.status == 200:
logger.info(f"验证码邮件发送成功: {email}")
return True
else:
logger.error(f"验证码邮件发送失败: {email}, 状态码: {response.status}, 响应: {response_text[:200]}")
return False
except Exception as e:
logger.error(f"邮件发送异常: {email}, 错误: {e}")
return False
except Exception as e:
logger.error(f"发送验证码邮件异常: {e}")
return False
# ==================== 卡券管理方法 ====================
def create_card(self, name: str, card_type: str, api_config=None,
text_content: str = None, data_content: str = None,
description: str = None, enabled: bool = True, delay_seconds: int = 0,
is_multi_spec: bool = False, spec_name: str = None, spec_value: str = None,
user_id: int = None):
"""创建新卡券(支持多规格)"""
with self.lock:
try:
# 验证多规格参数
if is_multi_spec:
if not spec_name or not spec_value:
raise ValueError("多规格卡券必须提供规格名称和规格值")
# 检查唯一性:卡券名称+规格名称+规格值
cursor = self.conn.cursor()
cursor.execute('''
SELECT COUNT(*) FROM cards
WHERE name = ? AND spec_name = ? AND spec_value = ? AND user_id = ?
''', (name, spec_name, spec_value, user_id))
if cursor.fetchone()[0] > 0:
raise ValueError(f"卡券已存在:{name} - {spec_name}:{spec_value}")
else:
# 检查唯一性:仅卡券名称
cursor = self.conn.cursor()
cursor.execute('''
SELECT COUNT(*) FROM cards
WHERE name = ? AND (is_multi_spec = 0 OR is_multi_spec IS NULL) AND user_id = ?
''', (name, user_id))
if cursor.fetchone()[0] > 0:
raise ValueError(f"卡券名称已存在:{name}")
# 处理api_config参数 - 如果是字典则转换为JSON字符串
api_config_str = None
if api_config is not None:
if isinstance(api_config, dict):
import json
api_config_str = json.dumps(api_config)
else:
api_config_str = str(api_config)
cursor.execute('''
INSERT INTO cards (name, type, api_config, text_content, data_content,
description, enabled, delay_seconds, is_multi_spec,
spec_name, spec_value, user_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (name, card_type, api_config_str, text_content, data_content,
description, enabled, delay_seconds, is_multi_spec,
spec_name, spec_value, user_id))
self.conn.commit()
card_id = cursor.lastrowid
if is_multi_spec:
logger.info(f"创建多规格卡券成功: {name} - {spec_name}:{spec_value} (ID: {card_id})")
else:
logger.info(f"创建卡券成功: {name} (ID: {card_id})")
return card_id
except Exception as e:
logger.error(f"创建卡券失败: {e}")
raise
def get_all_cards(self, user_id: int = None):
"""获取所有卡券(支持用户隔离)"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
cursor.execute('''
SELECT id, name, type, api_config, text_content, data_content,
description, enabled, delay_seconds, is_multi_spec,
spec_name, spec_value, created_at, updated_at
FROM cards
WHERE user_id = ?
ORDER BY created_at DESC
''', (user_id,))
else:
cursor.execute('''
SELECT id, name, type, api_config, text_content, data_content,
description, enabled, delay_seconds, is_multi_spec,
spec_name, spec_value, created_at, updated_at
FROM cards
ORDER BY created_at DESC
''')
cards = []
for row in cursor.fetchall():
# 解析api_config JSON字符串
api_config = row[3]
if api_config:
try:
import json
api_config = json.loads(api_config)
except (json.JSONDecodeError, TypeError):
# 如果解析失败,保持原始字符串
pass
cards.append({
'id': row[0],
'name': row[1],
'type': row[2],
'api_config': api_config,
'text_content': row[4],
'data_content': row[5],
'description': row[6],
'enabled': bool(row[7]),
'delay_seconds': row[8] or 0,
'is_multi_spec': bool(row[9]) if row[9] is not None else False,
'spec_name': row[10],
'spec_value': row[11],
'created_at': row[12],
'updated_at': row[13]
})
return cards
except Exception as e:
logger.error(f"获取卡券列表失败: {e}")
return []
def get_card_by_id(self, card_id: int, user_id: int = None):
"""根据ID获取卡券支持用户隔离"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
cursor.execute('''
SELECT id, name, type, api_config, text_content, data_content,
description, enabled, delay_seconds, is_multi_spec,
spec_name, spec_value, created_at, updated_at
FROM cards WHERE id = ? AND user_id = ?
''', (card_id, user_id))
else:
cursor.execute('''
SELECT id, name, type, api_config, text_content, data_content,
description, enabled, delay_seconds, is_multi_spec,
spec_name, spec_value, created_at, updated_at
FROM cards WHERE id = ?
''', (card_id,))
row = cursor.fetchone()
if row:
# 解析api_config JSON字符串
api_config = row[3]
if api_config:
try:
import json
api_config = json.loads(api_config)
except (json.JSONDecodeError, TypeError):
# 如果解析失败,保持原始字符串
pass
return {
'id': row[0],
'name': row[1],
'type': row[2],
'api_config': api_config,
'text_content': row[4],
'data_content': row[5],
'description': row[6],
'enabled': bool(row[7]),
'delay_seconds': row[8] or 0,
'is_multi_spec': bool(row[9]) if row[9] is not None else False,
'spec_name': row[10],
'spec_value': row[11],
'created_at': row[12],
'updated_at': row[13]
}
return None
except Exception as e:
logger.error(f"获取卡券失败: {e}")
return None
def update_card(self, card_id: int, name: str = None, card_type: str = None,
api_config=None, text_content: str = None, data_content: str = None,
description: str = None, enabled: bool = None, delay_seconds: int = None,
is_multi_spec: bool = None, spec_name: str = None, spec_value: str = None):
"""更新卡券"""
with self.lock:
try:
# 处理api_config参数
api_config_str = None
if api_config is not None:
if isinstance(api_config, dict):
import json
api_config_str = json.dumps(api_config)
else:
api_config_str = str(api_config)
cursor = self.conn.cursor()
# 构建更新语句
update_fields = []
params = []
if name is not None:
update_fields.append("name = ?")
params.append(name)
if card_type is not None:
update_fields.append("type = ?")
params.append(card_type)
if api_config_str is not None:
update_fields.append("api_config = ?")
params.append(api_config_str)
if text_content is not None:
update_fields.append("text_content = ?")
params.append(text_content)
if data_content is not None:
update_fields.append("data_content = ?")
params.append(data_content)
if description is not None:
update_fields.append("description = ?")
params.append(description)
if enabled is not None:
update_fields.append("enabled = ?")
params.append(enabled)
if delay_seconds is not None:
update_fields.append("delay_seconds = ?")
params.append(delay_seconds)
if is_multi_spec is not None:
update_fields.append("is_multi_spec = ?")
params.append(is_multi_spec)
if spec_name is not None:
update_fields.append("spec_name = ?")
params.append(spec_name)
if spec_value is not None:
update_fields.append("spec_value = ?")
params.append(spec_value)
if not update_fields:
return True # 没有需要更新的字段
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(card_id)
sql = f"UPDATE cards SET {', '.join(update_fields)} WHERE id = ?"
self._execute_sql(cursor, sql, params)
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"更新卡券成功: ID {card_id}")
return True
else:
return False # 没有找到对应的记录
except Exception as e:
logger.error(f"更新卡券失败: {e}")
self.conn.rollback()
raise
# ==================== 自动发货规则方法 ====================
def create_delivery_rule(self, keyword: str, card_id: int, delivery_count: int = 1,
enabled: bool = True, description: str = None, user_id: int = None):
"""创建发货规则"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO delivery_rules (keyword, card_id, delivery_count, enabled, description, user_id)
VALUES (?, ?, ?, ?, ?, ?)
''', (keyword, card_id, delivery_count, enabled, description, user_id))
self.conn.commit()
rule_id = cursor.lastrowid
logger.info(f"创建发货规则成功: {keyword} -> 卡券ID {card_id} (规则ID: {rule_id})")
return rule_id
except Exception as e:
logger.error(f"创建发货规则失败: {e}")
raise
def get_all_delivery_rules(self, user_id: int = None):
"""获取所有发货规则"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
cursor.execute('''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
c.name as card_name, c.type as card_type,
c.is_multi_spec, c.spec_name, c.spec_value
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
WHERE dr.user_id = ?
ORDER BY dr.created_at DESC
''', (user_id,))
else:
cursor.execute('''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
c.name as card_name, c.type as card_type,
c.is_multi_spec, c.spec_name, c.spec_value
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
ORDER BY dr.created_at DESC
''')
rules = []
for row in cursor.fetchall():
rules.append({
'id': row[0],
'keyword': row[1],
'card_id': row[2],
'delivery_count': row[3],
'enabled': bool(row[4]),
'description': row[5],
'delivery_times': row[6],
'created_at': row[7],
'updated_at': row[8],
'card_name': row[9],
'card_type': row[10],
'is_multi_spec': bool(row[11]) if row[11] is not None else False,
'spec_name': row[12],
'spec_value': row[13]
})
return rules
except Exception as e:
logger.error(f"获取发货规则列表失败: {e}")
return []
def get_delivery_rules_by_keyword(self, keyword: str):
"""根据关键字获取匹配的发货规则"""
with self.lock:
try:
cursor = self.conn.cursor()
# 使用更灵活的匹配方式:既支持商品内容包含关键字,也支持关键字包含在商品内容中
cursor.execute('''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times,
c.name as card_name, c.type as card_type, c.api_config,
c.text_content, c.data_content, c.enabled as card_enabled, c.description as card_description,
c.delay_seconds as card_delay_seconds
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
WHERE dr.enabled = 1 AND c.enabled = 1
AND (? LIKE '%' || dr.keyword || '%' OR dr.keyword LIKE '%' || ? || '%')
ORDER BY
CASE
WHEN ? LIKE '%' || dr.keyword || '%' THEN LENGTH(dr.keyword)
ELSE LENGTH(dr.keyword) / 2
END DESC,
dr.id ASC
''', (keyword, keyword, keyword))
rules = []
for row in cursor.fetchall():
# 解析api_config JSON字符串
api_config = row[9]
if api_config:
try:
import json
api_config = json.loads(api_config)
except (json.JSONDecodeError, TypeError):
# 如果解析失败,保持原始字符串
pass
rules.append({
'id': row[0],
'keyword': row[1],
'card_id': row[2],
'delivery_count': row[3],
'enabled': bool(row[4]),
'description': row[5],
'delivery_times': row[6],
'card_name': row[7],
'card_type': row[8],
'api_config': api_config, # 修复字段名
'text_content': row[10],
'data_content': row[11],
'card_enabled': bool(row[12]),
'card_description': row[13], # 卡券备注信息
'card_delay_seconds': row[14] or 0 # 延时秒数
})
return rules
except Exception as e:
logger.error(f"根据关键字获取发货规则失败: {e}")
return []
def get_delivery_rule_by_id(self, rule_id: int, user_id: int = None):
"""根据ID获取发货规则支持用户隔离"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
self._execute_sql(cursor, '''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
c.name as card_name, c.type as card_type
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
WHERE dr.id = ? AND dr.user_id = ?
''', (rule_id, user_id))
else:
self._execute_sql(cursor, '''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times, dr.created_at, dr.updated_at,
c.name as card_name, c.type as card_type
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
WHERE dr.id = ?
''', (rule_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'keyword': row[1],
'card_id': row[2],
'delivery_count': row[3],
'enabled': bool(row[4]),
'description': row[5],
'delivery_times': row[6],
'created_at': row[7],
'updated_at': row[8],
'card_name': row[9],
'card_type': row[10]
}
return None
except Exception as e:
logger.error(f"获取发货规则失败: {e}")
return None
def update_delivery_rule(self, rule_id: int, keyword: str = None, card_id: int = None,
delivery_count: int = None, enabled: bool = None,
description: str = None, user_id: int = None):
"""更新发货规则(支持用户隔离)"""
with self.lock:
try:
cursor = self.conn.cursor()
# 构建更新语句
update_fields = []
params = []
if keyword is not None:
update_fields.append("keyword = ?")
params.append(keyword)
if card_id is not None:
update_fields.append("card_id = ?")
params.append(card_id)
if delivery_count is not None:
update_fields.append("delivery_count = ?")
params.append(delivery_count)
if enabled is not None:
update_fields.append("enabled = ?")
params.append(enabled)
if description is not None:
update_fields.append("description = ?")
params.append(description)
if not update_fields:
return True # 没有需要更新的字段
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(rule_id)
if user_id is not None:
params.append(user_id)
sql = f"UPDATE delivery_rules SET {', '.join(update_fields)} WHERE id = ? AND user_id = ?"
else:
sql = f"UPDATE delivery_rules SET {', '.join(update_fields)} WHERE id = ?"
self._execute_sql(cursor, sql, params)
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"更新发货规则成功: ID {rule_id}")
return True
else:
return False # 没有找到对应的记录
except Exception as e:
logger.error(f"更新发货规则失败: {e}")
self.conn.rollback()
raise
def increment_delivery_times(self, rule_id: int):
"""增加发货次数"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
UPDATE delivery_rules
SET delivery_times = delivery_times + 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (rule_id,))
self.conn.commit()
logger.debug(f"发货规则 {rule_id} 发货次数已增加")
except Exception as e:
logger.error(f"更新发货次数失败: {e}")
def get_delivery_rules_by_keyword_and_spec(self, keyword: str, spec_name: str = None, spec_value: str = None):
"""根据关键字和规格信息获取匹配的发货规则(支持多规格)"""
with self.lock:
try:
cursor = self.conn.cursor()
# 优先匹配:卡券名称+规格名称+规格值
if spec_name and spec_value:
cursor.execute('''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times,
c.name as card_name, c.type as card_type, c.api_config,
c.text_content, c.data_content, c.enabled as card_enabled,
c.description as card_description, c.delay_seconds as card_delay_seconds,
c.is_multi_spec, c.spec_name, c.spec_value
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
WHERE dr.enabled = 1 AND c.enabled = 1
AND (? LIKE '%' || dr.keyword || '%' OR dr.keyword LIKE '%' || ? || '%')
AND c.is_multi_spec = 1 AND c.spec_name = ? AND c.spec_value = ?
ORDER BY
CASE
WHEN ? LIKE '%' || dr.keyword || '%' THEN LENGTH(dr.keyword)
ELSE LENGTH(dr.keyword) / 2
END DESC,
dr.delivery_times ASC
''', (keyword, keyword, spec_name, spec_value, keyword))
rules = []
for row in cursor.fetchall():
# 解析api_config JSON字符串
api_config = row[9]
if api_config:
try:
import json
api_config = json.loads(api_config)
except (json.JSONDecodeError, TypeError):
# 如果解析失败,保持原始字符串
pass
rules.append({
'id': row[0],
'keyword': row[1],
'card_id': row[2],
'delivery_count': row[3],
'enabled': bool(row[4]),
'description': row[5],
'delivery_times': row[6] or 0,
'card_name': row[7],
'card_type': row[8],
'api_config': api_config,
'text_content': row[10],
'data_content': row[11],
'card_enabled': bool(row[12]),
'card_description': row[13],
'card_delay_seconds': row[14] or 0,
'is_multi_spec': bool(row[15]),
'spec_name': row[16],
'spec_value': row[17]
})
if rules:
logger.info(f"找到多规格匹配规则: {keyword} - {spec_name}:{spec_value}")
return rules
# 兜底匹配:仅卡券名称
cursor.execute('''
SELECT dr.id, dr.keyword, dr.card_id, dr.delivery_count, dr.enabled,
dr.description, dr.delivery_times,
c.name as card_name, c.type as card_type, c.api_config,
c.text_content, c.data_content, c.enabled as card_enabled,
c.description as card_description, c.delay_seconds as card_delay_seconds,
c.is_multi_spec, c.spec_name, c.spec_value
FROM delivery_rules dr
LEFT JOIN cards c ON dr.card_id = c.id
WHERE dr.enabled = 1 AND c.enabled = 1
AND (? LIKE '%' || dr.keyword || '%' OR dr.keyword LIKE '%' || ? || '%')
AND (c.is_multi_spec = 0 OR c.is_multi_spec IS NULL)
ORDER BY
CASE
WHEN ? LIKE '%' || dr.keyword || '%' THEN LENGTH(dr.keyword)
ELSE LENGTH(dr.keyword) / 2
END DESC,
dr.delivery_times ASC
''', (keyword, keyword, keyword))
rules = []
for row in cursor.fetchall():
# 解析api_config JSON字符串
api_config = row[9]
if api_config:
try:
import json
api_config = json.loads(api_config)
except (json.JSONDecodeError, TypeError):
# 如果解析失败,保持原始字符串
pass
rules.append({
'id': row[0],
'keyword': row[1],
'card_id': row[2],
'delivery_count': row[3],
'enabled': bool(row[4]),
'description': row[5],
'delivery_times': row[6] or 0,
'card_name': row[7],
'card_type': row[8],
'api_config': api_config,
'text_content': row[10],
'data_content': row[11],
'card_enabled': bool(row[12]),
'card_description': row[13],
'card_delay_seconds': row[14] or 0,
'is_multi_spec': bool(row[15]) if row[15] is not None else False,
'spec_name': row[16],
'spec_value': row[17]
})
if rules:
logger.info(f"找到兜底匹配规则: {keyword}")
else:
logger.info(f"未找到匹配规则: {keyword}")
return rules
except Exception as e:
logger.error(f"获取发货规则失败: {e}")
return []
def delete_card(self, card_id: int):
"""删除卡券"""
with self.lock:
try:
cursor = self.conn.cursor()
self._execute_sql(cursor, "DELETE FROM cards WHERE id = ?", (card_id,))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"删除卡券成功: ID {card_id}")
return True
else:
return False # 没有找到对应的记录
except Exception as e:
logger.error(f"删除卡券失败: {e}")
self.conn.rollback()
raise
def delete_delivery_rule(self, rule_id: int, user_id: int = None):
"""删除发货规则(支持用户隔离)"""
with self.lock:
try:
cursor = self.conn.cursor()
if user_id is not None:
self._execute_sql(cursor, "DELETE FROM delivery_rules WHERE id = ? AND user_id = ?", (rule_id, user_id))
else:
self._execute_sql(cursor, "DELETE FROM delivery_rules WHERE id = ?", (rule_id,))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"删除发货规则成功: ID {rule_id} (用户ID: {user_id})")
return True
else:
return False # 没有找到对应的记录
except Exception as e:
logger.error(f"删除发货规则失败: {e}")
self.conn.rollback()
raise
def consume_batch_data(self, card_id: int):
"""消费批量数据的第一条记录(线程安全)"""
with self.lock:
try:
cursor = self.conn.cursor()
# 获取卡券的批量数据
self._execute_sql(cursor, "SELECT data_content FROM cards WHERE id = ? AND type = 'data'", (card_id,))
result = cursor.fetchone()
if not result or not result[0]:
logger.warning(f"卡券 {card_id} 没有批量数据")
return None
data_content = result[0]
lines = [line.strip() for line in data_content.split('\n') if line.strip()]
if not lines:
logger.warning(f"卡券 {card_id} 批量数据为空")
return None
# 获取第一条数据
first_line = lines[0]
# 移除第一条数据,更新数据库
remaining_lines = lines[1:]
new_data_content = '\n'.join(remaining_lines)
cursor.execute('''
UPDATE cards
SET data_content = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (new_data_content, card_id))
self.conn.commit()
logger.info(f"消费批量数据成功: 卡券ID={card_id}, 剩余={len(remaining_lines)}")
return first_line
except Exception as e:
logger.error(f"消费批量数据失败: {e}")
self.conn.rollback()
return None
# ==================== 商品信息管理 ====================
def save_item_basic_info(self, cookie_id: str, item_id: str, item_title: str = None,
item_description: str = None, item_category: str = None,
item_price: str = None, item_detail: str = None) -> bool:
"""保存或更新商品基本信息,使用原子操作避免并发问题
Args:
cookie_id: Cookie ID
item_id: 商品ID
item_title: 商品标题
item_description: 商品描述
item_category: 商品分类
item_price: 商品价格
item_detail: 商品详情JSON
Returns:
bool: 操作是否成功
"""
try:
with self.lock:
cursor = self.conn.cursor()
# 使用 INSERT OR IGNORE + UPDATE 的原子操作模式
# 首先尝试插入,如果已存在则忽略
cursor.execute('''
INSERT OR IGNORE INTO item_info (cookie_id, item_id, item_title, item_description,
item_category, item_price, item_detail, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
''', (cookie_id, item_id, item_title or '', item_description or '',
item_category or '', item_price or '', item_detail or ''))
# 如果是新插入的记录,直接返回成功
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"新增商品基本信息: {item_id} - {item_title}")
return True
# 记录已存在使用原子UPDATE操作只更新非空字段且不覆盖现有非空值
update_parts = []
params = []
# 使用 CASE WHEN 语句进行条件更新,避免覆盖现有数据
if item_title:
update_parts.append("item_title = CASE WHEN (item_title IS NULL OR item_title = '') THEN ? ELSE item_title END")
params.append(item_title)
if item_description:
update_parts.append("item_description = CASE WHEN (item_description IS NULL OR item_description = '') THEN ? ELSE item_description END")
params.append(item_description)
if item_category:
update_parts.append("item_category = CASE WHEN (item_category IS NULL OR item_category = '') THEN ? ELSE item_category END")
params.append(item_category)
if item_price:
update_parts.append("item_price = CASE WHEN (item_price IS NULL OR item_price = '') THEN ? ELSE item_price END")
params.append(item_price)
# 对于item_detail只有在现有值为空时才更新
if item_detail:
update_parts.append("item_detail = CASE WHEN (item_detail IS NULL OR item_detail = '' OR TRIM(item_detail) = '') THEN ? ELSE item_detail END")
params.append(item_detail)
if update_parts:
update_parts.append("updated_at = CURRENT_TIMESTAMP")
params.extend([cookie_id, item_id])
sql = f"UPDATE item_info SET {', '.join(update_parts)} WHERE cookie_id = ? AND item_id = ?"
self._execute_sql(cursor, sql, params)
if cursor.rowcount > 0:
logger.info(f"更新商品基本信息: {item_id} - {item_title}")
else:
logger.debug(f"商品信息无需更新: {item_id}")
self.conn.commit()
return True
except Exception as e:
logger.error(f"保存商品基本信息失败: {e}")
self.conn.rollback()
return False
def save_item_info(self, cookie_id: str, item_id: str, item_data = None) -> bool:
"""保存或更新商品信息
Args:
cookie_id: Cookie ID
item_id: 商品ID
item_data: 商品详情数据可以是字符串或字典也可以为None
Returns:
bool: 操作是否成功
"""
try:
with self.lock:
cursor = self.conn.cursor()
# 检查商品是否已存在
cursor.execute('''
SELECT id, item_detail FROM item_info
WHERE cookie_id = ? AND item_id = ?
''', (cookie_id, item_id))
existing = cursor.fetchone()
if existing:
# 如果传入的商品详情有值,则用最新数据覆盖
if item_data is not None and item_data:
# 处理字符串类型的详情数据
if isinstance(item_data, str):
cursor.execute('''
UPDATE item_info SET
item_detail = ?, updated_at = CURRENT_TIMESTAMP
WHERE cookie_id = ? AND item_id = ?
''', (item_data, cookie_id, item_id))
else:
# 处理字典类型的详情数据(向后兼容)
cursor.execute('''
UPDATE item_info SET
item_title = ?, item_description = ?, item_category = ?,
item_price = ?, item_detail = ?, updated_at = CURRENT_TIMESTAMP
WHERE cookie_id = ? AND item_id = ?
''', (
item_data.get('title', ''),
item_data.get('description', ''),
item_data.get('category', ''),
item_data.get('price', ''),
json.dumps(item_data, ensure_ascii=False),
cookie_id, item_id
))
logger.info(f"更新商品信息(覆盖): {item_id}")
else:
# 如果商品详情没有数据,则不更新,只记录存在
logger.debug(f"商品信息已存在,无新数据,跳过更新: {item_id}")
return True
else:
# 新增商品信息
if isinstance(item_data, str):
# 直接保存字符串详情
cursor.execute('''
INSERT INTO item_info (cookie_id, item_id, item_detail)
VALUES (?, ?, ?)
''', (cookie_id, item_id, item_data))
else:
# 处理字典类型的详情数据(向后兼容)
cursor.execute('''
INSERT INTO item_info (cookie_id, item_id, item_title, item_description,
item_category, item_price, item_detail)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
cookie_id, item_id,
item_data.get('title', '') if item_data else '',
item_data.get('description', '') if item_data else '',
item_data.get('category', '') if item_data else '',
item_data.get('price', '') if item_data else '',
json.dumps(item_data, ensure_ascii=False) if item_data else ''
))
logger.info(f"新增商品信息: {item_id}")
self.conn.commit()
return True
except Exception as e:
logger.error(f"保存商品信息失败: {e}")
return False
def get_item_info(self, cookie_id: str, item_id: str) -> Optional[Dict]:
"""获取商品信息
Args:
cookie_id: Cookie ID
item_id: 商品ID
Returns:
Dict: 商品信息如果不存在返回None
"""
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM item_info
WHERE cookie_id = ? AND item_id = ?
''', (cookie_id, item_id))
row = cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
item_info = dict(zip(columns, row))
# 解析item_detail JSON
if item_info.get('item_detail'):
try:
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
except:
item_info['item_detail_parsed'] = {}
return item_info
return None
except Exception as e:
logger.error(f"获取商品信息失败: {e}")
return None
def update_item_multi_spec_status(self, cookie_id: str, item_id: str, is_multi_spec: bool) -> bool:
"""更新商品的多规格状态"""
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
UPDATE item_info
SET is_multi_spec = ?, updated_at = CURRENT_TIMESTAMP
WHERE cookie_id = ? AND item_id = ?
''', (is_multi_spec, cookie_id, item_id))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"更新商品多规格状态成功: {item_id} -> {is_multi_spec}")
return True
else:
logger.warning(f"商品不存在,无法更新多规格状态: {item_id}")
return False
except Exception as e:
logger.error(f"更新商品多规格状态失败: {e}")
self.conn.rollback()
return False
def get_item_multi_spec_status(self, cookie_id: str, item_id: str) -> bool:
"""获取商品的多规格状态"""
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
SELECT is_multi_spec FROM item_info
WHERE cookie_id = ? AND item_id = ?
''', (cookie_id, item_id))
row = cursor.fetchone()
if row:
return bool(row[0]) if row[0] is not None else False
return False
except Exception as e:
logger.error(f"获取商品多规格状态失败: {e}")
return False
def get_items_by_cookie(self, cookie_id: str) -> List[Dict]:
"""获取指定Cookie的所有商品信息
Args:
cookie_id: Cookie ID
Returns:
List[Dict]: 商品信息列表
"""
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM item_info
WHERE cookie_id = ?
ORDER BY updated_at DESC
''', (cookie_id,))
columns = [description[0] for description in cursor.description]
items = []
for row in cursor.fetchall():
item_info = dict(zip(columns, row))
# 解析item_detail JSON
if item_info.get('item_detail'):
try:
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
except:
item_info['item_detail_parsed'] = {}
items.append(item_info)
return items
except Exception as e:
logger.error(f"获取Cookie商品信息失败: {e}")
return []
def get_all_items(self) -> List[Dict]:
"""获取所有商品信息
Returns:
List[Dict]: 所有商品信息列表
"""
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM item_info
ORDER BY updated_at DESC
''')
columns = [description[0] for description in cursor.description]
items = []
for row in cursor.fetchall():
item_info = dict(zip(columns, row))
# 解析item_detail JSON
if item_info.get('item_detail'):
try:
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
except:
item_info['item_detail_parsed'] = {}
items.append(item_info)
return items
except Exception as e:
logger.error(f"获取所有商品信息失败: {e}")
return []
def update_item_detail(self, cookie_id: str, item_id: str, item_detail: str) -> bool:
"""更新商品详情(不覆盖商品标题等基本信息)
Args:
cookie_id: Cookie ID
item_id: 商品ID
item_detail: 商品详情JSON字符串
Returns:
bool: 操作是否成功
"""
try:
with self.lock:
cursor = self.conn.cursor()
# 只更新item_detail字段不影响其他字段
cursor.execute('''
UPDATE item_info SET
item_detail = ?, updated_at = CURRENT_TIMESTAMP
WHERE cookie_id = ? AND item_id = ?
''', (item_detail, cookie_id, item_id))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"更新商品详情成功: {item_id}")
return True
else:
logger.warning(f"未找到要更新的商品: {item_id}")
return False
except Exception as e:
logger.error(f"更新商品详情失败: {e}")
self.conn.rollback()
return False
def update_item_title_only(self, cookie_id: str, item_id: str, item_title: str) -> bool:
"""仅更新商品标题(并发安全)
Args:
cookie_id: Cookie ID
item_id: 商品ID
item_title: 商品标题
Returns:
bool: 操作是否成功
"""
try:
with self.lock:
cursor = self.conn.cursor()
# 使用 INSERT OR REPLACE 确保记录存在,但只更新标题字段
cursor.execute('''
INSERT INTO item_info (cookie_id, item_id, item_title, item_description,
item_category, item_price, item_detail, created_at, updated_at)
VALUES (?, ?, ?,
COALESCE((SELECT item_description FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
COALESCE((SELECT item_category FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
COALESCE((SELECT item_price FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
COALESCE((SELECT item_detail FROM item_info WHERE cookie_id = ? AND item_id = ?), ''),
COALESCE((SELECT created_at FROM item_info WHERE cookie_id = ? AND item_id = ?), CURRENT_TIMESTAMP),
CURRENT_TIMESTAMP)
ON CONFLICT(cookie_id, item_id) DO UPDATE SET
item_title = excluded.item_title,
updated_at = CURRENT_TIMESTAMP
''', (cookie_id, item_id, item_title,
cookie_id, item_id, cookie_id, item_id, cookie_id, item_id,
cookie_id, item_id, cookie_id, item_id))
self.conn.commit()
logger.info(f"更新商品标题成功: {item_id} - {item_title}")
return True
except Exception as e:
logger.error(f"更新商品标题失败: {e}")
self.conn.rollback()
return False
def batch_save_item_basic_info(self, items_data: list) -> int:
"""批量保存商品基本信息(并发安全)
Args:
items_data: 商品数据列表,每个元素包含 cookie_id, item_id, item_title 等字段
Returns:
int: 成功保存的商品数量
"""
if not items_data:
return 0
success_count = 0
try:
with self.lock:
cursor = self.conn.cursor()
# 使用事务批量处理
cursor.execute('BEGIN TRANSACTION')
for item_data in items_data:
try:
cookie_id = item_data.get('cookie_id')
item_id = item_data.get('item_id')
item_title = item_data.get('item_title', '')
item_description = item_data.get('item_description', '')
item_category = item_data.get('item_category', '')
item_price = item_data.get('item_price', '')
item_detail = item_data.get('item_detail', '')
if not cookie_id or not item_id:
continue
# 使用 INSERT OR IGNORE + UPDATE 模式
cursor.execute('''
INSERT OR IGNORE INTO item_info (cookie_id, item_id, item_title, item_description,
item_category, item_price, item_detail, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
''', (cookie_id, item_id, item_title, item_description,
item_category, item_price, item_detail))
if cursor.rowcount == 0:
# 记录已存在,进行条件更新
update_sql = '''
UPDATE item_info SET
item_title = CASE WHEN (item_title IS NULL OR item_title = '') AND ? != '' THEN ? ELSE item_title END,
item_description = CASE WHEN (item_description IS NULL OR item_description = '') AND ? != '' THEN ? ELSE item_description END,
item_category = CASE WHEN (item_category IS NULL OR item_category = '') AND ? != '' THEN ? ELSE item_category END,
item_price = CASE WHEN (item_price IS NULL OR item_price = '') AND ? != '' THEN ? ELSE item_price END,
item_detail = CASE WHEN (item_detail IS NULL OR item_detail = '' OR TRIM(item_detail) = '') AND ? != '' THEN ? ELSE item_detail END,
updated_at = CURRENT_TIMESTAMP
WHERE cookie_id = ? AND item_id = ?
'''
self._execute_sql(cursor, update_sql, (
item_title, item_title,
item_description, item_description,
item_category, item_category,
item_price, item_price,
item_detail, item_detail,
cookie_id, item_id
))
success_count += 1
except Exception as item_e:
logger.warning(f"批量保存单个商品失败 {item_data.get('item_id', 'unknown')}: {item_e}")
continue
cursor.execute('COMMIT')
logger.info(f"批量保存商品信息完成: {success_count}/{len(items_data)} 个商品")
return success_count
except Exception as e:
logger.error(f"批量保存商品信息失败: {e}")
try:
cursor.execute('ROLLBACK')
except:
pass
return success_count
def delete_item_info(self, cookie_id: str, item_id: str) -> bool:
"""删除商品信息
Args:
cookie_id: Cookie ID
item_id: 商品ID
Returns:
bool: 操作是否成功
"""
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('DELETE FROM item_info WHERE cookie_id = ? AND item_id = ?',
(cookie_id, item_id))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"删除商品信息成功: {cookie_id} - {item_id}")
return True
else:
logger.warning(f"未找到要删除的商品信息: {cookie_id} - {item_id}")
return False
except Exception as e:
logger.error(f"删除商品信息失败: {e}")
self.conn.rollback()
return False
def batch_delete_item_info(self, items_to_delete: list) -> int:
"""批量删除商品信息
Args:
items_to_delete: 要删除的商品列表,每个元素包含 cookie_id 和 item_id
Returns:
int: 成功删除的商品数量
"""
if not items_to_delete:
return 0
success_count = 0
try:
with self.lock:
cursor = self.conn.cursor()
cursor.execute('BEGIN TRANSACTION')
for item_data in items_to_delete:
try:
cookie_id = item_data.get('cookie_id')
item_id = item_data.get('item_id')
if not cookie_id or not item_id:
continue
cursor.execute('DELETE FROM item_info WHERE cookie_id = ? AND item_id = ?',
(cookie_id, item_id))
if cursor.rowcount > 0:
success_count += 1
logger.debug(f"删除商品信息: {cookie_id} - {item_id}")
except Exception as item_e:
logger.warning(f"删除单个商品失败 {item_data.get('item_id', 'unknown')}: {item_e}")
continue
cursor.execute('COMMIT')
logger.info(f"批量删除商品信息完成: {success_count}/{len(items_to_delete)} 个商品")
return success_count
except Exception as e:
logger.error(f"批量删除商品信息失败: {e}")
try:
cursor.execute('ROLLBACK')
except:
pass
return success_count
# ==================== 用户设置管理方法 ====================
def get_user_settings(self, user_id: int):
"""获取用户的所有设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT key, value, description, updated_at
FROM user_settings
WHERE user_id = ?
ORDER BY key
''', (user_id,))
settings = {}
for row in cursor.fetchall():
settings[row[0]] = {
'value': row[1],
'description': row[2],
'updated_at': row[3]
}
return settings
except Exception as e:
logger.error(f"获取用户设置失败: {e}")
return {}
def get_user_setting(self, user_id: int, key: str):
"""获取用户的特定设置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT value, description, updated_at
FROM user_settings
WHERE user_id = ? AND key = ?
''', (user_id, key))
row = cursor.fetchone()
if row:
return {
'key': key,
'value': row[0],
'description': row[1],
'updated_at': row[2]
}
return None
except Exception as e:
logger.error(f"获取用户设置失败: {e}")
return None
def set_user_setting(self, user_id: int, key: str, value: str, description: str = None):
"""设置用户配置"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO user_settings (user_id, key, value, description, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (user_id, key, value, description))
self.conn.commit()
logger.info(f"用户设置更新成功: user_id={user_id}, key={key}")
return True
except Exception as e:
logger.error(f"设置用户配置失败: {e}")
self.conn.rollback()
return False
# ==================== 管理员专用方法 ====================
def get_all_users(self):
"""获取所有用户信息(管理员专用)"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, username, email, created_at, updated_at
FROM users
ORDER BY created_at DESC
''')
users = []
for row in cursor.fetchall():
users.append({
'id': row[0],
'username': row[1],
'email': row[2],
'created_at': row[3],
'updated_at': row[4]
})
return users
except Exception as e:
logger.error(f"获取所有用户失败: {e}")
return []
def get_user_by_id(self, user_id: int):
"""根据ID获取用户信息"""
with self.lock:
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, username, email, created_at, updated_at
FROM users
WHERE id = ?
''', (user_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'username': row[1],
'email': row[2],
'created_at': row[3],
'updated_at': row[4]
}
return None
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def delete_user_and_data(self, user_id: int):
"""删除用户及其所有相关数据"""
with self.lock:
try:
cursor = self.conn.cursor()
# 开始事务
cursor.execute('BEGIN TRANSACTION')
# 删除用户相关的所有数据
# 1. 删除用户设置
cursor.execute('DELETE FROM user_settings WHERE user_id = ?', (user_id,))
# 2. 删除用户的卡券
cursor.execute('DELETE FROM cards WHERE user_id = ?', (user_id,))
# 3. 删除用户的发货规则
cursor.execute('DELETE FROM delivery_rules WHERE user_id = ?', (user_id,))
# 4. 删除用户的通知渠道
cursor.execute('DELETE FROM notification_channels WHERE user_id = ?', (user_id,))
# 5. 删除用户的Cookie
cursor.execute('DELETE FROM cookies WHERE user_id = ?', (user_id,))
# 6. 删除用户的关键字
cursor.execute('DELETE FROM keywords WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
# 7. 删除用户的默认回复
cursor.execute('DELETE FROM default_replies WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
# 8. 删除用户的AI回复设置
cursor.execute('DELETE FROM ai_reply_settings WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
# 9. 删除用户的消息通知
cursor.execute('DELETE FROM message_notifications WHERE cookie_id IN (SELECT id FROM cookies WHERE user_id = ?)', (user_id,))
# 10. 最后删除用户本身
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
# 提交事务
cursor.execute('COMMIT')
logger.info(f"用户及相关数据删除成功: user_id={user_id}")
return True
except Exception as e:
# 回滚事务
cursor.execute('ROLLBACK')
logger.error(f"删除用户及相关数据失败: {e}")
return False
def get_table_data(self, table_name: str):
"""获取指定表的所有数据"""
with self.lock:
try:
cursor = self.conn.cursor()
# 获取表结构
cursor.execute(f"PRAGMA table_info({table_name})")
columns_info = cursor.fetchall()
columns = [col[1] for col in columns_info] # 列名
# 获取表数据
cursor.execute(f"SELECT * FROM {table_name}")
rows = cursor.fetchall()
# 转换为字典列表
data = []
for row in rows:
row_dict = {}
for i, value in enumerate(row):
row_dict[columns[i]] = value
data.append(row_dict)
return data, columns
except Exception as e:
logger.error(f"获取表数据失败: {table_name} - {e}")
return [], []
def delete_table_record(self, table_name: str, record_id: str):
"""删除指定表的指定记录"""
with self.lock:
try:
cursor = self.conn.cursor()
# 根据表名确定主键字段
primary_key_map = {
'users': 'id',
'cookies': 'id',
'keywords': 'id',
'default_replies': 'id',
'ai_reply_settings': 'id',
'message_notifications': 'id',
'cards': 'id',
'delivery_rules': 'id',
'notification_channels': 'id',
'user_settings': 'id',
'email_verifications': 'id',
'captcha_codes': 'id'
}
primary_key = primary_key_map.get(table_name, 'id')
# 删除记录
cursor.execute(f"DELETE FROM {table_name} WHERE {primary_key} = ?", (record_id,))
if cursor.rowcount > 0:
self.conn.commit()
logger.info(f"删除表记录成功: {table_name}.{record_id}")
return True
else:
logger.warning(f"删除表记录失败,记录不存在: {table_name}.{record_id}")
return False
except Exception as e:
logger.error(f"删除表记录失败: {table_name}.{record_id} - {e}")
self.conn.rollback()
return False
def clear_table_data(self, table_name: str):
"""清空指定表的所有数据"""
with self.lock:
try:
cursor = self.conn.cursor()
# 清空表数据
cursor.execute(f"DELETE FROM {table_name}")
# 重置自增ID如果有的话
cursor.execute(f"DELETE FROM sqlite_sequence WHERE name = ?", (table_name,))
self.conn.commit()
logger.info(f"清空表数据成功: {table_name}")
return True
except Exception as e:
logger.error(f"清空表数据失败: {table_name} - {e}")
self.conn.rollback()
return False
# 全局单例
db_manager = DBManager()
# 确保进程结束时关闭数据库连接
import atexit
atexit.register(db_manager.close)