diff --git a/static/register.html b/static/register.html
new file mode 100644
index 0000000..bbde700
--- /dev/null
+++ b/static/register.html
@@ -0,0 +1,569 @@
+
+
+
+
+
+
用户注册 - 闲鱼自动回复系统
+
+
+
+
+
+
+
+
+
+
+
diff --git a/static/user_management.html b/static/user_management.html
new file mode 100644
index 0000000..cae0628
--- /dev/null
+++ b/static/user_management.html
@@ -0,0 +1,394 @@
+
+
+
+
+
+
用户管理 - 闲鱼自动回复系统
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 加载中...
+
+
正在加载用户信息...
+
+
+
+
+
+
+
+
+
+
+
+
+
+
您确定要删除用户 吗?
+
+
+
警告:此操作将删除该用户的所有数据,包括:
+
+ - 所有Cookie账号
+ - 所有卡券
+ - 所有关键字和回复设置
+ - 所有个人设置
+
+
此操作不可恢复!
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test_admin_features.py b/test_admin_features.py
new file mode 100644
index 0000000..17712f0
--- /dev/null
+++ b/test_admin_features.py
@@ -0,0 +1,345 @@
+#!/usr/bin/env python3
+"""
+测试管理员功能
+"""
+
+import requests
+import json
+import time
+import sqlite3
+from loguru import logger
+
+BASE_URL = "http://localhost:8080"
+
+def test_admin_login():
+ """测试管理员登录"""
+ logger.info("测试管理员登录...")
+
+ response = requests.post(f"{BASE_URL}/login",
+ json={'username': 'admin', 'password': 'admin123'})
+
+ if response.json()['success']:
+ token = response.json()['token']
+ user_id = response.json()['user_id']
+
+ logger.info(f"管理员登录成功,token: {token[:20]}...")
+ return {
+ 'token': token,
+ 'user_id': user_id,
+ 'headers': {'Authorization': f'Bearer {token}'}
+ }
+ else:
+ logger.error("管理员登录失败")
+ return None
+
+def test_user_management(admin):
+ """测试用户管理功能"""
+ logger.info("测试用户管理功能...")
+
+ # 1. 获取所有用户
+ response = requests.get(f"{BASE_URL}/admin/users", headers=admin['headers'])
+ if response.status_code == 200:
+ users = response.json()['users']
+ logger.info(f"✅ 获取用户列表成功,共 {len(users)} 个用户")
+
+ for user in users:
+ logger.info(f" 用户: {user['username']} (ID: {user['id']}) - Cookie: {user.get('cookie_count', 0)}, 卡券: {user.get('card_count', 0)}")
+ else:
+ logger.error(f"❌ 获取用户列表失败: {response.status_code}")
+ return False
+
+ # 2. 测试非管理员权限验证
+ logger.info("测试非管理员权限验证...")
+
+ # 创建一个普通用户token(模拟)
+ fake_headers = {'Authorization': 'Bearer fake_token'}
+ response = requests.get(f"{BASE_URL}/admin/users", headers=fake_headers)
+
+ if response.status_code == 401 or response.status_code == 403:
+ logger.info("✅ 非管理员权限验证正常")
+ else:
+ logger.warning(f"⚠️ 权限验证可能有问题: {response.status_code}")
+
+ return True
+
+def test_system_stats(admin):
+ """测试系统统计功能"""
+ logger.info("测试系统统计功能...")
+
+ response = requests.get(f"{BASE_URL}/admin/stats", headers=admin['headers'])
+ if response.status_code == 200:
+ stats = response.json()
+ logger.info("✅ 获取系统统计成功:")
+ logger.info(f" 总用户数: {stats['users']['total']}")
+ logger.info(f" 总Cookie数: {stats['cookies']['total']}")
+ logger.info(f" 总卡券数: {stats['cards']['total']}")
+ logger.info(f" 系统版本: {stats['system']['version']}")
+ return True
+ else:
+ logger.error(f"❌ 获取系统统计失败: {response.status_code}")
+ return False
+
+def test_log_management(admin):
+ """测试日志管理功能"""
+ logger.info("测试日志管理功能...")
+
+ # 1. 获取系统日志
+ response = requests.get(f"{BASE_URL}/admin/logs?lines=50", headers=admin['headers'])
+ if response.status_code == 200:
+ log_data = response.json()
+ logs = log_data.get('logs', [])
+ logger.info(f"✅ 获取系统日志成功,共 {len(logs)} 条")
+
+ if logs:
+ logger.info(" 最新几条日志:")
+ for i, log in enumerate(logs[-3:]): # 显示最后3条
+ logger.info(f" {i+1}. {log[:100]}...")
+ else:
+ logger.error(f"❌ 获取系统日志失败: {response.status_code}")
+ return False
+
+ # 2. 测试日志级别过滤
+ response = requests.get(f"{BASE_URL}/admin/logs?lines=20&level=info", headers=admin['headers'])
+ if response.status_code == 200:
+ log_data = response.json()
+ info_logs = log_data.get('logs', [])
+ logger.info(f"✅ INFO级别日志过滤成功,共 {len(info_logs)} 条")
+ else:
+ logger.error(f"❌ 日志级别过滤失败: {response.status_code}")
+ return False
+
+ return True
+
+def create_test_user_for_deletion():
+ """创建一个测试用户用于删除测试"""
+ logger.info("创建测试用户用于删除测试...")
+
+ user_data = {
+ "username": "test_delete_user",
+ "email": "delete@test.com",
+ "password": "test123456"
+ }
+
+ try:
+ # 清理可能存在的用户
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('DELETE FROM users WHERE username = ? OR email = ?', (user_data['username'], user_data['email']))
+ cursor.execute('DELETE FROM email_verifications WHERE email = ?', (user_data['email'],))
+ conn.commit()
+ conn.close()
+
+ # 生成验证码
+ session_id = f"delete_test_{int(time.time())}"
+
+ # 生成图形验证码
+ captcha_response = requests.post(f"{BASE_URL}/generate-captcha",
+ json={'session_id': session_id})
+ if not captcha_response.json()['success']:
+ logger.error("图形验证码生成失败")
+ return None
+
+ # 获取图形验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM captcha_codes WHERE session_id = ? ORDER BY created_at DESC LIMIT 1',
+ (session_id,))
+ captcha_result = cursor.fetchone()
+ conn.close()
+
+ if not captcha_result:
+ logger.error("无法获取图形验证码")
+ return None
+
+ captcha_code = captcha_result[0]
+
+ # 验证图形验证码
+ verify_response = requests.post(f"{BASE_URL}/verify-captcha",
+ json={'session_id': session_id, 'captcha_code': captcha_code})
+ if not verify_response.json()['success']:
+ logger.error("图形验证码验证失败")
+ return None
+
+ # 发送邮箱验证码
+ email_response = requests.post(f"{BASE_URL}/send-verification-code",
+ json={'email': user_data['email'], 'session_id': session_id})
+ if not email_response.json()['success']:
+ logger.error("邮箱验证码发送失败")
+ return None
+
+ # 获取邮箱验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM email_verifications WHERE email = ? ORDER BY created_at DESC LIMIT 1',
+ (user_data['email'],))
+ email_result = cursor.fetchone()
+ conn.close()
+
+ if not email_result:
+ logger.error("无法获取邮箱验证码")
+ return None
+
+ email_code = email_result[0]
+
+ # 注册用户
+ register_response = requests.post(f"{BASE_URL}/register",
+ json={
+ 'username': user_data['username'],
+ 'email': user_data['email'],
+ 'verification_code': email_code,
+ 'password': user_data['password']
+ })
+
+ if register_response.json()['success']:
+ logger.info(f"测试用户创建成功: {user_data['username']}")
+
+ # 获取用户ID
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT id FROM users WHERE username = ?', (user_data['username'],))
+ result = cursor.fetchone()
+ conn.close()
+
+ if result:
+ return {
+ 'user_id': result[0],
+ 'username': user_data['username']
+ }
+
+ logger.error("测试用户创建失败")
+ return None
+
+ except Exception as e:
+ logger.error(f"创建测试用户失败: {e}")
+ return None
+
+def test_user_deletion(admin):
+ """测试用户删除功能"""
+ logger.info("测试用户删除功能...")
+
+ # 创建测试用户
+ test_user = create_test_user_for_deletion()
+ if not test_user:
+ logger.error("无法创建测试用户,跳过删除测试")
+ return False
+
+ user_id = test_user['user_id']
+ username = test_user['username']
+
+ # 删除用户
+ response = requests.delete(f"{BASE_URL}/admin/users/{user_id}", headers=admin['headers'])
+ if response.status_code == 200:
+ logger.info(f"✅ 用户删除成功: {username}")
+
+ # 验证用户确实被删除
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT id FROM users WHERE id = ?', (user_id,))
+ result = cursor.fetchone()
+ conn.close()
+
+ if not result:
+ logger.info("✅ 用户删除验证通过")
+ return True
+ else:
+ logger.error("❌ 用户删除验证失败,用户仍然存在")
+ return False
+ else:
+ logger.error(f"❌ 用户删除失败: {response.status_code}")
+ return False
+
+def test_admin_self_deletion_protection(admin):
+ """测试管理员自删除保护"""
+ logger.info("测试管理员自删除保护...")
+
+ admin_user_id = admin['user_id']
+
+ response = requests.delete(f"{BASE_URL}/admin/users/{admin_user_id}", headers=admin['headers'])
+ if response.status_code == 400:
+ logger.info("✅ 管理员自删除保护正常")
+ return True
+ else:
+ logger.error(f"❌ 管理员自删除保护失败: {response.status_code}")
+ return False
+
+def main():
+ """主测试函数"""
+ print("🚀 管理员功能测试")
+ print("=" * 60)
+
+ print("📋 测试内容:")
+ print("• 管理员登录")
+ print("• 用户管理功能")
+ print("• 系统统计功能")
+ print("• 日志管理功能")
+ print("• 用户删除功能")
+ print("• 管理员保护机制")
+
+ try:
+ # 管理员登录
+ admin = test_admin_login()
+ if not admin:
+ print("❌ 测试失败:管理员登录失败")
+ return False
+
+ print("✅ 管理员登录成功")
+
+ # 测试各项功能
+ tests = [
+ ("用户管理", lambda: test_user_management(admin)),
+ ("系统统计", lambda: test_system_stats(admin)),
+ ("日志管理", lambda: test_log_management(admin)),
+ ("用户删除", lambda: test_user_deletion(admin)),
+ ("管理员保护", lambda: test_admin_self_deletion_protection(admin))
+ ]
+
+ results = []
+ for test_name, test_func in tests:
+ print(f"\n🧪 测试 {test_name}...")
+ try:
+ result = test_func()
+ results.append((test_name, result))
+ if result:
+ print(f"✅ {test_name} 测试通过")
+ else:
+ print(f"❌ {test_name} 测试失败")
+ except Exception as e:
+ print(f"💥 {test_name} 测试异常: {e}")
+ results.append((test_name, False))
+
+ print("\n" + "=" * 60)
+ print("🎉 管理员功能测试完成!")
+
+ print("\n📊 测试结果:")
+ passed = 0
+ for test_name, result in results:
+ status = "✅ 通过" if result else "❌ 失败"
+ print(f" {test_name}: {status}")
+ if result:
+ passed += 1
+
+ print(f"\n📈 总体结果: {passed}/{len(results)} 项测试通过")
+
+ if passed == len(results):
+ print("🎊 所有测试都通过了!管理员功能正常工作。")
+
+ print("\n💡 使用说明:")
+ print("1. 使用admin账号登录系统")
+ print("2. 在侧边栏可以看到'管理员功能'菜单")
+ print("3. 点击'用户管理'可以查看和删除用户")
+ print("4. 点击'系统日志'可以查看系统运行日志")
+ print("5. 这些功能只有admin用户可以访问")
+
+ return True
+ else:
+ print("⚠️ 部分测试失败,请检查相关功能。")
+ return False
+
+ except Exception as e:
+ print(f"💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()
diff --git a/test_captcha.png b/test_captcha.png
new file mode 100644
index 0000000..0dd14aa
Binary files /dev/null and b/test_captcha.png differ
diff --git a/test_captcha_system.py b/test_captcha_system.py
new file mode 100644
index 0000000..10a2580
--- /dev/null
+++ b/test_captcha_system.py
@@ -0,0 +1,240 @@
+#!/usr/bin/env python3
+"""
+图形验证码系统测试
+"""
+
+import requests
+import json
+import sqlite3
+import base64
+from io import BytesIO
+from PIL import Image
+
+def test_captcha_generation():
+ """测试图形验证码生成"""
+ print("🧪 测试图形验证码生成")
+ print("-" * 40)
+
+ session_id = "test_session_123"
+
+ # 1. 生成图形验证码
+ print("1️⃣ 生成图形验证码...")
+ response = requests.post('http://localhost:8080/generate-captcha',
+ json={'session_id': session_id})
+
+ print(f" 状态码: {response.status_code}")
+
+ if response.status_code == 200:
+ result = response.json()
+ print(f" 成功: {result['success']}")
+ print(f" 消息: {result['message']}")
+
+ if result['success']:
+ captcha_image = result['captcha_image']
+ print(f" 图片数据长度: {len(captcha_image)}")
+
+ # 保存图片到文件(用于调试)
+ if captcha_image.startswith('data:image/png;base64,'):
+ image_data = captcha_image.split(',')[1]
+ image_bytes = base64.b64decode(image_data)
+
+ with open('test_captcha.png', 'wb') as f:
+ f.write(image_bytes)
+ print(" ✅ 图片已保存为 test_captcha.png")
+
+ # 从数据库获取验证码文本
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM captcha_codes WHERE session_id = ? ORDER BY created_at DESC LIMIT 1',
+ (session_id,))
+ result = cursor.fetchone()
+ if result:
+ captcha_text = result[0]
+ print(f" 验证码文本: {captcha_text}")
+ conn.close()
+ return session_id, captcha_text
+ conn.close()
+ else:
+ print(" ❌ 图形验证码生成失败")
+ else:
+ print(f" ❌ 请求失败: {response.text}")
+
+ return None, None
+
+def test_captcha_verification(session_id, captcha_text):
+ """测试图形验证码验证"""
+ print("\n🧪 测试图形验证码验证")
+ print("-" * 40)
+
+ # 2. 验证正确的验证码
+ print("2️⃣ 验证正确的验证码...")
+ response = requests.post('http://localhost:8080/verify-captcha',
+ json={
+ 'session_id': session_id,
+ 'captcha_code': captcha_text
+ })
+
+ print(f" 状态码: {response.status_code}")
+ if response.status_code == 200:
+ result = response.json()
+ print(f" 成功: {result['success']}")
+ print(f" 消息: {result['message']}")
+
+ if result['success']:
+ print(" ✅ 正确验证码验证成功")
+ else:
+ print(" ❌ 正确验证码验证失败")
+
+ # 3. 验证错误的验证码
+ print("\n3️⃣ 验证错误的验证码...")
+ response = requests.post('http://localhost:8080/verify-captcha',
+ json={
+ 'session_id': session_id,
+ 'captcha_code': 'WRONG'
+ })
+
+ print(f" 状态码: {response.status_code}")
+ if response.status_code == 200:
+ result = response.json()
+ print(f" 成功: {result['success']}")
+ print(f" 消息: {result['message']}")
+
+ if not result['success']:
+ print(" ✅ 错误验证码被正确拒绝")
+ else:
+ print(" ❌ 错误验证码验证成功(不应该)")
+
+def test_captcha_expiry():
+ """测试图形验证码过期"""
+ print("\n🧪 测试图形验证码过期")
+ print("-" * 40)
+
+ # 直接在数据库中插入过期的验证码
+ import time
+ expired_session = "expired_session_123"
+ expired_time = time.time() - 3600 # 1小时前
+
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('''
+ INSERT INTO captcha_codes (session_id, code, expires_at)
+ VALUES (?, ?, ?)
+ ''', (expired_session, 'EXPIRED', expired_time))
+ conn.commit()
+ conn.close()
+
+ print("4️⃣ 验证过期的验证码...")
+ response = requests.post('http://localhost:8080/verify-captcha',
+ json={
+ 'session_id': expired_session,
+ 'captcha_code': 'EXPIRED'
+ })
+
+ print(f" 状态码: {response.status_code}")
+ if response.status_code == 200:
+ result = response.json()
+ print(f" 成功: {result['success']}")
+ print(f" 消息: {result['message']}")
+
+ if not result['success']:
+ print(" ✅ 过期验证码被正确拒绝")
+ else:
+ print(" ❌ 过期验证码验证成功(不应该)")
+
+def test_database_cleanup():
+ """测试数据库清理"""
+ print("\n🧪 测试数据库清理")
+ print("-" * 40)
+
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+
+ # 清理测试数据
+ cursor.execute('DELETE FROM captcha_codes WHERE session_id LIKE "test%" OR session_id LIKE "expired%"')
+ deleted_count = cursor.rowcount
+ conn.commit()
+ conn.close()
+
+ print(f"5️⃣ 清理测试数据: 删除了 {deleted_count} 条记录")
+ print(" ✅ 数据库清理完成")
+
+def test_frontend_integration():
+ """测试前端集成"""
+ print("\n🧪 测试前端集成")
+ print("-" * 40)
+
+ # 测试注册页面是否可以访问
+ print("6️⃣ 测试注册页面...")
+ response = requests.get('http://localhost:8080/register.html')
+
+ print(f" 状态码: {response.status_code}")
+ if response.status_code == 200:
+ content = response.text
+
+ # 检查是否包含图形验证码相关元素
+ checks = [
+ ('captchaCode', '图形验证码输入框'),
+ ('captchaImage', '图形验证码图片'),
+ ('refreshCaptcha', '刷新验证码函数'),
+ ('verifyCaptcha', '验证验证码函数'),
+ ('loadCaptcha', '加载验证码函数')
+ ]
+
+ for check_item, description in checks:
+ if check_item in content:
+ print(f" ✅ {description}: 存在")
+ else:
+ print(f" ❌ {description}: 缺失")
+
+ print(" ✅ 注册页面加载成功")
+ else:
+ print(f" ❌ 注册页面加载失败: {response.status_code}")
+
+def main():
+ """主测试函数"""
+ print("🚀 图形验证码系统测试")
+ print("=" * 60)
+
+ try:
+ # 测试图形验证码生成
+ session_id, captcha_text = test_captcha_generation()
+
+ if session_id and captcha_text:
+ # 测试图形验证码验证
+ test_captcha_verification(session_id, captcha_text)
+
+ # 测试验证码过期
+ test_captcha_expiry()
+
+ # 测试前端集成
+ test_frontend_integration()
+
+ # 清理测试数据
+ test_database_cleanup()
+
+ print("\n" + "=" * 60)
+ print("🎉 图形验证码系统测试完成!")
+
+ print("\n📋 测试总结:")
+ print("✅ 图形验证码生成功能正常")
+ print("✅ 图形验证码验证功能正常")
+ print("✅ 过期验证码处理正常")
+ print("✅ 前端页面集成正常")
+ print("✅ 数据库操作正常")
+
+ print("\n💡 下一步:")
+ print("1. 访问 http://localhost:8080/register.html")
+ print("2. 测试图形验证码功能")
+ print("3. 验证邮箱验证码发送需要先通过图形验证码")
+ print("4. 完成用户注册流程")
+
+ except Exception as e:
+ print(f"\n❌ 测试失败: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+ return True
+
+if __name__ == "__main__":
+ main()
diff --git a/test_complete_isolation.py b/test_complete_isolation.py
new file mode 100644
index 0000000..7bdf02f
--- /dev/null
+++ b/test_complete_isolation.py
@@ -0,0 +1,347 @@
+#!/usr/bin/env python3
+"""
+完整的多用户数据隔离测试
+"""
+
+import requests
+import json
+import time
+import sqlite3
+from loguru import logger
+
+BASE_URL = "http://localhost:8080"
+
+def create_test_users():
+ """创建测试用户"""
+ logger.info("创建测试用户...")
+
+ users = [
+ {"username": "testuser1", "email": "user1@test.com", "password": "test123456"},
+ {"username": "testuser2", "email": "user2@test.com", "password": "test123456"}
+ ]
+
+ created_users = []
+
+ for user in users:
+ try:
+ # 清理可能存在的用户
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('DELETE FROM users WHERE username = ? OR email = ?', (user['username'], user['email']))
+ cursor.execute('DELETE FROM email_verifications WHERE email = ?', (user['email'],))
+ conn.commit()
+ conn.close()
+
+ # 生成验证码
+ session_id = f"test_{user['username']}_{int(time.time())}"
+
+ # 生成图形验证码
+ captcha_response = requests.post(f"{BASE_URL}/generate-captcha",
+ json={'session_id': session_id})
+ if not captcha_response.json()['success']:
+ logger.error(f"图形验证码生成失败: {user['username']}")
+ continue
+
+ # 获取图形验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM captcha_codes WHERE session_id = ? ORDER BY created_at DESC LIMIT 1',
+ (session_id,))
+ captcha_result = cursor.fetchone()
+ conn.close()
+
+ if not captcha_result:
+ logger.error(f"无法获取图形验证码: {user['username']}")
+ continue
+
+ captcha_code = captcha_result[0]
+
+ # 验证图形验证码
+ verify_response = requests.post(f"{BASE_URL}/verify-captcha",
+ json={'session_id': session_id, 'captcha_code': captcha_code})
+ if not verify_response.json()['success']:
+ logger.error(f"图形验证码验证失败: {user['username']}")
+ continue
+
+ # 发送邮箱验证码
+ email_response = requests.post(f"{BASE_URL}/send-verification-code",
+ json={'email': user['email'], 'session_id': session_id})
+ if not email_response.json()['success']:
+ logger.error(f"邮箱验证码发送失败: {user['username']}")
+ continue
+
+ # 获取邮箱验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM email_verifications WHERE email = ? ORDER BY created_at DESC LIMIT 1',
+ (user['email'],))
+ email_result = cursor.fetchone()
+ conn.close()
+
+ if not email_result:
+ logger.error(f"无法获取邮箱验证码: {user['username']}")
+ continue
+
+ email_code = email_result[0]
+
+ # 注册用户
+ register_response = requests.post(f"{BASE_URL}/register",
+ json={
+ 'username': user['username'],
+ 'email': user['email'],
+ 'verification_code': email_code,
+ 'password': user['password']
+ })
+
+ if register_response.json()['success']:
+ logger.info(f"用户注册成功: {user['username']}")
+
+ # 登录获取token
+ login_response = requests.post(f"{BASE_URL}/login",
+ json={'username': user['username'], 'password': user['password']})
+
+ if login_response.json()['success']:
+ token = login_response.json()['token']
+ user_id = login_response.json()['user_id']
+ created_users.append({
+ 'username': user['username'],
+ 'user_id': user_id,
+ 'token': token,
+ 'headers': {'Authorization': f'Bearer {token}'}
+ })
+ logger.info(f"用户登录成功: {user['username']}")
+ else:
+ logger.error(f"用户登录失败: {user['username']}")
+ else:
+ logger.error(f"用户注册失败: {user['username']} - {register_response.json()['message']}")
+
+ except Exception as e:
+ logger.error(f"创建用户失败: {user['username']} - {e}")
+
+ return created_users
+
+def test_cards_isolation(users):
+ """测试卡券管理的用户隔离"""
+ logger.info("测试卡券管理的用户隔离...")
+
+ if len(users) < 2:
+ logger.error("需要至少2个用户进行隔离测试")
+ return False
+
+ user1, user2 = users[0], users[1]
+
+ # 用户1创建卡券
+ card1_data = {
+ "name": "用户1的卡券",
+ "type": "text",
+ "text_content": "这是用户1的卡券内容",
+ "description": "用户1创建的测试卡券"
+ }
+
+ response1 = requests.post(f"{BASE_URL}/cards", json=card1_data, headers=user1['headers'])
+ if response1.status_code == 200:
+ card1_id = response1.json()['id']
+ logger.info(f"用户1创建卡券成功: ID={card1_id}")
+ else:
+ logger.error(f"用户1创建卡券失败: {response1.text}")
+ return False
+
+ # 用户2创建卡券
+ card2_data = {
+ "name": "用户2的卡券",
+ "type": "text",
+ "text_content": "这是用户2的卡券内容",
+ "description": "用户2创建的测试卡券"
+ }
+
+ response2 = requests.post(f"{BASE_URL}/cards", json=card2_data, headers=user2['headers'])
+ if response2.status_code == 200:
+ card2_id = response2.json()['id']
+ logger.info(f"用户2创建卡券成功: ID={card2_id}")
+ else:
+ logger.error(f"用户2创建卡券失败: {response2.text}")
+ return False
+
+ # 验证用户1只能看到自己的卡券
+ response1_list = requests.get(f"{BASE_URL}/cards", headers=user1['headers'])
+ if response1_list.status_code == 200:
+ user1_cards = response1_list.json()
+ user1_card_names = [card['name'] for card in user1_cards]
+
+ if "用户1的卡券" in user1_card_names and "用户2的卡券" not in user1_card_names:
+ logger.info("✅ 用户1卡券隔离验证通过")
+ else:
+ logger.error("❌ 用户1卡券隔离验证失败")
+ return False
+ else:
+ logger.error(f"获取用户1卡券列表失败: {response1_list.text}")
+ return False
+
+ # 验证用户2只能看到自己的卡券
+ response2_list = requests.get(f"{BASE_URL}/cards", headers=user2['headers'])
+ if response2_list.status_code == 200:
+ user2_cards = response2_list.json()
+ user2_card_names = [card['name'] for card in user2_cards]
+
+ if "用户2的卡券" in user2_card_names and "用户1的卡券" not in user2_card_names:
+ logger.info("✅ 用户2卡券隔离验证通过")
+ else:
+ logger.error("❌ 用户2卡券隔离验证失败")
+ return False
+ else:
+ logger.error(f"获取用户2卡券列表失败: {response2_list.text}")
+ return False
+
+ # 验证跨用户访问被拒绝
+ response_cross = requests.get(f"{BASE_URL}/cards/{card2_id}", headers=user1['headers'])
+ if response_cross.status_code == 403 or response_cross.status_code == 404:
+ logger.info("✅ 跨用户卡券访问被正确拒绝")
+ else:
+ logger.error("❌ 跨用户卡券访问未被拒绝")
+ return False
+
+ return True
+
+def test_user_settings(users):
+ """测试用户设置功能"""
+ logger.info("测试用户设置功能...")
+
+ if len(users) < 2:
+ logger.error("需要至少2个用户进行设置测试")
+ return False
+
+ user1, user2 = users[0], users[1]
+
+ # 用户1设置主题颜色
+ setting1_data = {"value": "#ff0000", "description": "用户1的红色主题"}
+ response1 = requests.put(f"{BASE_URL}/user-settings/theme_color",
+ json=setting1_data, headers=user1['headers'])
+
+ if response1.status_code == 200:
+ logger.info("用户1主题颜色设置成功")
+ else:
+ logger.error(f"用户1主题颜色设置失败: {response1.text}")
+ return False
+
+ # 用户2设置主题颜色
+ setting2_data = {"value": "#00ff00", "description": "用户2的绿色主题"}
+ response2 = requests.put(f"{BASE_URL}/user-settings/theme_color",
+ json=setting2_data, headers=user2['headers'])
+
+ if response2.status_code == 200:
+ logger.info("用户2主题颜色设置成功")
+ else:
+ logger.error(f"用户2主题颜色设置失败: {response2.text}")
+ return False
+
+ # 验证用户1的设置
+ response1_get = requests.get(f"{BASE_URL}/user-settings/theme_color", headers=user1['headers'])
+ if response1_get.status_code == 200:
+ user1_color = response1_get.json()['value']
+ if user1_color == "#ff0000":
+ logger.info("✅ 用户1主题颜色隔离验证通过")
+ else:
+ logger.error(f"❌ 用户1主题颜色错误: {user1_color}")
+ return False
+ else:
+ logger.error(f"获取用户1主题颜色失败: {response1_get.text}")
+ return False
+
+ # 验证用户2的设置
+ response2_get = requests.get(f"{BASE_URL}/user-settings/theme_color", headers=user2['headers'])
+ if response2_get.status_code == 200:
+ user2_color = response2_get.json()['value']
+ if user2_color == "#00ff00":
+ logger.info("✅ 用户2主题颜色隔离验证通过")
+ else:
+ logger.error(f"❌ 用户2主题颜色错误: {user2_color}")
+ return False
+ else:
+ logger.error(f"获取用户2主题颜色失败: {response2_get.text}")
+ return False
+
+ return True
+
+def cleanup_test_data(users):
+ """清理测试数据"""
+ logger.info("清理测试数据...")
+
+ try:
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+
+ # 清理测试用户
+ test_usernames = [user['username'] for user in users]
+ if test_usernames:
+ placeholders = ','.join(['?' for _ in test_usernames])
+ cursor.execute(f'DELETE FROM users WHERE username IN ({placeholders})', test_usernames)
+ user_count = cursor.rowcount
+ else:
+ user_count = 0
+
+ # 清理测试卡券
+ cursor.execute('DELETE FROM cards WHERE name LIKE "用户%的卡券"')
+ card_count = cursor.rowcount
+
+ # 清理测试验证码
+ cursor.execute('DELETE FROM email_verifications WHERE email LIKE "%@test.com"')
+ email_count = cursor.rowcount
+
+ cursor.execute('DELETE FROM captcha_codes WHERE session_id LIKE "test_%"')
+ captcha_count = cursor.rowcount
+
+ conn.commit()
+ conn.close()
+
+ logger.info(f"清理完成: 用户{user_count}个, 卡券{card_count}个, 邮箱验证码{email_count}个, 图形验证码{captcha_count}个")
+
+ except Exception as e:
+ logger.error(f"清理失败: {e}")
+
+def main():
+ """主测试函数"""
+ print("🚀 完整的多用户数据隔离测试")
+ print("=" * 60)
+
+ try:
+ # 创建测试用户
+ users = create_test_users()
+
+ if len(users) < 2:
+ print("❌ 测试失败:无法创建足够的测试用户")
+ return False
+
+ print(f"✅ 成功创建 {len(users)} 个测试用户")
+
+ # 测试卡券管理隔离
+ cards_success = test_cards_isolation(users)
+
+ # 测试用户设置
+ settings_success = test_user_settings(users)
+
+ # 清理测试数据
+ cleanup_test_data(users)
+
+ print("\n" + "=" * 60)
+ if cards_success and settings_success:
+ print("🎉 完整的多用户数据隔离测试全部通过!")
+
+ print("\n📋 测试总结:")
+ print("✅ 卡券管理完全隔离")
+ print("✅ 用户设置完全隔离")
+ print("✅ 跨用户访问被正确拒绝")
+ print("✅ 数据库层面用户绑定正确")
+
+ return True
+ else:
+ print("❌ 多用户数据隔离测试失败!")
+ return False
+
+ except Exception as e:
+ print(f"💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()
diff --git a/test_cookie_log_display.py b/test_cookie_log_display.py
new file mode 100644
index 0000000..1834a30
--- /dev/null
+++ b/test_cookie_log_display.py
@@ -0,0 +1,166 @@
+#!/usr/bin/env python3
+"""
+测试Cookie ID在日志中的显示
+"""
+
+import time
+import asyncio
+from loguru import logger
+
+# 模拟多个Cookie的日志输出
+async def test_cookie_log_display():
+ """测试Cookie ID在日志中的显示效果"""
+
+ print("🧪 测试Cookie ID日志显示")
+ print("=" * 50)
+
+ # 模拟不同Cookie的日志输出
+ cookies = [
+ {"id": "user1_cookie", "name": "用户1的账号"},
+ {"id": "user2_cookie", "name": "用户2的账号"},
+ {"id": "admin_cookie", "name": "管理员账号"}
+ ]
+
+ print("📋 模拟多用户系统日志输出:")
+ print("-" * 50)
+
+ for i, cookie in enumerate(cookies):
+ cookie_id = cookie["id"]
+ cookie_name = cookie["name"]
+
+ # 模拟各种类型的日志
+ msg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+
+ # 1. Token相关日志
+ logger.info(f"【{cookie_id}】开始刷新token...")
+ await asyncio.sleep(0.1)
+ logger.info(f"【{cookie_id}】Token刷新成功")
+
+ # 2. 连接相关日志
+ logger.info(f"【{cookie_id}】连接注册完成")
+
+ # 3. 系统消息日志
+ logger.info(f"[{msg_time}] 【{cookie_id}】【系统】小闲鱼智能提示:")
+ logger.info(f" - 欢迎使用闲鱼智能助手")
+
+ # 4. 消息处理日志
+ logger.info(f"[{msg_time}] 【{cookie_id}】【系统】买家已付款,准备自动发货")
+ logger.info(f"【{cookie_id}】准备自动发货: item_id=123456, item_title=测试商品")
+
+ # 5. 回复相关日志
+ logger.info(f"【{cookie_id}】使用默认回复: 您好,感谢您的咨询!")
+ logger.info(f"【{cookie_id}】AI回复生成成功: 这是AI生成的回复")
+
+ # 6. 错误日志
+ if i == 1: # 只在第二个Cookie上模拟错误
+ logger.error(f"【{cookie_id}】Token刷新失败: 网络连接超时")
+
+ print() # 空行分隔
+ await asyncio.sleep(0.2)
+
+ print("=" * 50)
+ print("✅ 日志显示测试完成")
+
+ print("\n📊 日志格式说明:")
+ print("• 【Cookie_ID】- 标识具体的用户账号")
+ print("• 【系统】- 系统级别的消息")
+ print("• [时间戳] - 消息发生的具体时间")
+ print("• 不同用户的操作现在可以清晰区分")
+
+ print("\n🎯 改进效果:")
+ print("• ✅ 多用户环境下可以快速定位问题")
+ print("• ✅ 日志分析更加高效")
+ print("• ✅ 运维监控更加精准")
+ print("• ✅ 调试过程更加清晰")
+
+def test_log_format_comparison():
+ """对比改进前后的日志格式"""
+
+ print("\n🔍 日志格式对比")
+ print("=" * 50)
+
+ print("📝 改进前的日志格式:")
+ print("2025-07-25 14:23:47.770 | INFO | XianyuAutoAsync:init:1360 - 获取初始token...")
+ print("2025-07-25 14:23:47.771 | INFO | XianyuAutoAsync:refresh_token:134 - 开始刷新token...")
+ print("2025-07-25 14:23:48.269 | INFO | XianyuAutoAsync:refresh_token:200 - Token刷新成功")
+ print("2025-07-25 14:23:49.286 | INFO | XianyuAutoAsync:init:1407 - 连接注册完成")
+ print("2025-07-25 14:23:49.288 | INFO | XianyuAutoAsync:handle_message:1663 - [2025-07-25 14:23:49] 【系统】小闲鱼智能提示:")
+
+ print("\n📝 改进后的日志格式:")
+ print("2025-07-25 14:23:47.770 | INFO | XianyuAutoAsync:init:1360 - 【user1_cookie】获取初始token...")
+ print("2025-07-25 14:23:47.771 | INFO | XianyuAutoAsync:refresh_token:134 - 【user1_cookie】开始刷新token...")
+ print("2025-07-25 14:23:48.269 | INFO | XianyuAutoAsync:refresh_token:200 - 【user1_cookie】Token刷新成功")
+ print("2025-07-25 14:23:49.286 | INFO | XianyuAutoAsync:init:1407 - 【user1_cookie】连接注册完成")
+ print("2025-07-25 14:23:49.288 | INFO | XianyuAutoAsync:handle_message:1663 - [2025-07-25 14:23:49] 【user1_cookie】【系统】小闲鱼智能提示:")
+
+ print("\n🎯 改进优势:")
+ print("• 🔍 快速识别: 一眼就能看出是哪个用户的操作")
+ print("• 🐛 问题定位: 多用户环境下快速定位问题源头")
+ print("• 📈 监控分析: 可以按用户统计操作频率和成功率")
+ print("• 🔧 运维管理: 便于针对特定用户进行故障排查")
+
+def generate_log_analysis_tips():
+ """生成日志分析技巧"""
+
+ print("\n💡 日志分析技巧")
+ print("=" * 50)
+
+ tips = [
+ {
+ "title": "按用户过滤日志",
+ "command": "grep '【user1_cookie】' xianyu_2025-07-25.log",
+ "description": "查看特定用户的所有操作日志"
+ },
+ {
+ "title": "查看Token刷新情况",
+ "command": "grep '【.*】.*Token' xianyu_2025-07-25.log",
+ "description": "监控所有用户的Token刷新状态"
+ },
+ {
+ "title": "统计用户活跃度",
+ "command": "grep -o '【[^】]*】' xianyu_2025-07-25.log | sort | uniq -c",
+ "description": "统计各用户的操作次数"
+ },
+ {
+ "title": "查看系统消息",
+ "command": "grep '【系统】' xianyu_2025-07-25.log",
+ "description": "查看所有系统级别的消息"
+ },
+ {
+ "title": "监控错误日志",
+ "command": "grep 'ERROR.*【.*】' xianyu_2025-07-25.log",
+ "description": "查看特定用户的错误信息"
+ }
+ ]
+
+ for i, tip in enumerate(tips, 1):
+ print(f"{i}. {tip['title']}")
+ print(f" 命令: {tip['command']}")
+ print(f" 说明: {tip['description']}")
+ print()
+
+async def main():
+ """主函数"""
+ print("🚀 Cookie ID日志显示测试工具")
+ print("=" * 60)
+
+ # 测试日志显示
+ await test_cookie_log_display()
+
+ # 对比日志格式
+ test_log_format_comparison()
+
+ # 生成分析技巧
+ generate_log_analysis_tips()
+
+ print("=" * 60)
+ print("🎉 测试完成!现在您可以在多用户环境下清晰地识别每个用户的操作。")
+
+ print("\n📋 下一步建议:")
+ print("1. 重启服务以应用日志改进")
+ print("2. 观察实际运行中的日志输出")
+ print("3. 使用提供的命令进行日志分析")
+ print("4. 根据需要调整日志级别和格式")
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/test_database_backup.py b/test_database_backup.py
new file mode 100644
index 0000000..a98a005
--- /dev/null
+++ b/test_database_backup.py
@@ -0,0 +1,254 @@
+#!/usr/bin/env python3
+"""
+测试数据库备份和恢复功能
+"""
+
+import requests
+import os
+import time
+import sqlite3
+from loguru import logger
+
+BASE_URL = "http://localhost:8080"
+
+def test_admin_login():
+ """测试管理员登录"""
+ logger.info("测试管理员登录...")
+
+ response = requests.post(f"{BASE_URL}/login",
+ json={'username': 'admin', 'password': 'admin123'})
+
+ if response.json()['success']:
+ token = response.json()['token']
+ user_id = response.json()['user_id']
+
+ logger.info(f"管理员登录成功,token: {token[:20]}...")
+ return {
+ 'token': token,
+ 'user_id': user_id,
+ 'headers': {'Authorization': f'Bearer {token}'}
+ }
+ else:
+ logger.error("管理员登录失败")
+ return None
+
+def test_database_download(admin):
+ """测试数据库下载"""
+ logger.info("测试数据库下载...")
+
+ try:
+ response = requests.get(f"{BASE_URL}/admin/backup/download",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ # 保存下载的文件
+ timestamp = int(time.time())
+ backup_filename = f"test_backup_{timestamp}.db"
+
+ with open(backup_filename, 'wb') as f:
+ f.write(response.content)
+
+ # 验证下载的文件
+ file_size = os.path.getsize(backup_filename)
+ logger.info(f"✅ 数据库下载成功: {backup_filename} ({file_size} bytes)")
+
+ # 验证是否为有效的SQLite数据库
+ try:
+ conn = sqlite3.connect(backup_filename)
+ cursor = conn.cursor()
+ cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
+ tables = cursor.fetchall()
+ conn.close()
+
+ table_names = [table[0] for table in tables]
+ logger.info(f" 数据库包含 {len(table_names)} 个表: {', '.join(table_names[:5])}...")
+
+ return backup_filename
+
+ except sqlite3.Error as e:
+ logger.error(f"❌ 下载的文件不是有效的SQLite数据库: {e}")
+ return None
+
+ else:
+ logger.error(f"❌ 数据库下载失败: {response.status_code}")
+ return None
+
+ except Exception as e:
+ logger.error(f"❌ 数据库下载异常: {e}")
+ return None
+
+def test_backup_file_list(admin):
+ """测试备份文件列表"""
+ logger.info("测试备份文件列表...")
+
+ try:
+ response = requests.get(f"{BASE_URL}/admin/backup/list",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ data = response.json()
+ backups = data.get('backups', [])
+ logger.info(f"✅ 获取备份文件列表成功,共 {len(backups)} 个备份文件")
+
+ for backup in backups[:3]: # 显示前3个
+ logger.info(f" {backup['filename']} - {backup['size_mb']}MB - {backup['created_time']}")
+
+ return True
+ else:
+ logger.error(f"❌ 获取备份文件列表失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ logger.error(f"❌ 获取备份文件列表异常: {e}")
+ return False
+
+def test_database_upload(admin, backup_file):
+ """测试数据库上传(模拟,不实际执行)"""
+ logger.info("测试数据库上传准备...")
+
+ if not backup_file or not os.path.exists(backup_file):
+ logger.error("❌ 没有有效的备份文件进行上传测试")
+ return False
+
+ try:
+ # 检查文件大小
+ file_size = os.path.getsize(backup_file)
+ if file_size > 100 * 1024 * 1024: # 100MB
+ logger.warning(f"⚠️ 备份文件过大: {file_size} bytes")
+ return False
+
+ # 验证文件格式
+ try:
+ conn = sqlite3.connect(backup_file)
+ cursor = conn.cursor()
+ cursor.execute("SELECT COUNT(*) FROM users")
+ user_count = cursor.fetchone()[0]
+ conn.close()
+
+ logger.info(f"✅ 备份文件验证通过,包含 {user_count} 个用户")
+ logger.info(" 注意:实际上传会替换当前数据库,此处仅验证文件有效性")
+
+ return True
+
+ except sqlite3.Error as e:
+ logger.error(f"❌ 备份文件验证失败: {e}")
+ return False
+
+ except Exception as e:
+ logger.error(f"❌ 数据库上传准备异常: {e}")
+ return False
+
+def cleanup_test_files():
+ """清理测试文件"""
+ logger.info("清理测试文件...")
+
+ import glob
+ test_files = glob.glob("test_backup_*.db")
+
+ for file_path in test_files:
+ try:
+ os.remove(file_path)
+ logger.info(f" 删除测试文件: {file_path}")
+ except Exception as e:
+ logger.warning(f" 删除文件失败: {file_path} - {e}")
+
+def main():
+ """主测试函数"""
+ print("🚀 数据库备份和恢复功能测试")
+ print("=" * 60)
+
+ print("📋 测试内容:")
+ print("• 管理员登录")
+ print("• 数据库备份下载")
+ print("• 备份文件列表查询")
+ print("• 备份文件验证")
+ print("• 数据库上传准备(不实际执行)")
+
+ try:
+ # 管理员登录
+ admin = test_admin_login()
+ if not admin:
+ print("❌ 测试失败:管理员登录失败")
+ return False
+
+ print("✅ 管理员登录成功")
+
+ # 测试各项功能
+ tests = [
+ ("数据库下载", lambda: test_database_download(admin)),
+ ("备份文件列表", lambda: test_backup_file_list(admin)),
+ ]
+
+ results = []
+ backup_file = None
+
+ for test_name, test_func in tests:
+ print(f"\n🧪 测试 {test_name}...")
+ try:
+ result = test_func()
+ if test_name == "数据库下载" and result:
+ backup_file = result
+ result = True
+
+ results.append((test_name, result))
+ if result:
+ print(f"✅ {test_name} 测试通过")
+ else:
+ print(f"❌ {test_name} 测试失败")
+ except Exception as e:
+ print(f"💥 {test_name} 测试异常: {e}")
+ results.append((test_name, False))
+
+ # 测试数据库上传准备
+ print(f"\n🧪 测试数据库上传准备...")
+ upload_result = test_database_upload(admin, backup_file)
+ results.append(("数据库上传准备", upload_result))
+ if upload_result:
+ print(f"✅ 数据库上传准备 测试通过")
+ else:
+ print(f"❌ 数据库上传准备 测试失败")
+
+ # 清理测试文件
+ cleanup_test_files()
+
+ print("\n" + "=" * 60)
+ print("🎉 数据库备份和恢复功能测试完成!")
+
+ print("\n📊 测试结果:")
+ passed = 0
+ for test_name, result in results:
+ status = "✅ 通过" if result else "❌ 失败"
+ print(f" {test_name}: {status}")
+ if result:
+ passed += 1
+
+ print(f"\n📈 总体结果: {passed}/{len(results)} 项测试通过")
+
+ if passed == len(results):
+ print("🎊 所有测试都通过了!数据库备份功能正常工作。")
+
+ print("\n💡 使用说明:")
+ print("1. 登录admin账号,进入系统设置")
+ print("2. 在备份管理区域点击'下载数据库'")
+ print("3. 数据库文件会自动下载到本地")
+ print("4. 需要恢复时,选择.db文件并点击'恢复数据库'")
+ print("5. 系统会自动验证文件并替换数据库")
+
+ print("\n⚠️ 重要提醒:")
+ print("• 数据库恢复会完全替换当前所有数据")
+ print("• 建议在恢复前先下载当前数据库作为备份")
+ print("• 恢复后建议刷新页面以加载新数据")
+
+ return True
+ else:
+ print("⚠️ 部分测试失败,请检查相关功能。")
+ return False
+
+ except Exception as e:
+ print(f"💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()
diff --git a/test_docker_deployment.sh b/test_docker_deployment.sh
new file mode 100644
index 0000000..f50051c
--- /dev/null
+++ b/test_docker_deployment.sh
@@ -0,0 +1,192 @@
+#!/bin/bash
+
+# Docker多用户系统部署测试脚本
+
+echo "🚀 Docker多用户系统部署测试"
+echo "=================================="
+
+# 颜色定义
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+BLUE='\033[0;34m'
+NC='\033[0m' # No Color
+
+# 检查Docker和Docker Compose
+echo -e "${BLUE}1. 检查Docker环境${NC}"
+if ! command -v docker &> /dev/null; then
+ echo -e "${RED}❌ Docker未安装${NC}"
+ exit 1
+fi
+
+if ! command -v docker-compose &> /dev/null; then
+ echo -e "${RED}❌ Docker Compose未安装${NC}"
+ exit 1
+fi
+
+echo -e "${GREEN}✅ Docker环境检查通过${NC}"
+echo " Docker版本: $(docker --version)"
+echo " Docker Compose版本: $(docker-compose --version)"
+
+# 检查必要文件
+echo -e "\n${BLUE}2. 检查部署文件${NC}"
+required_files=("Dockerfile" "docker-compose.yml" "requirements.txt")
+
+for file in "${required_files[@]}"; do
+ if [ -f "$file" ]; then
+ echo -e "${GREEN}✅ $file${NC}"
+ else
+ echo -e "${RED}❌ $file 不存在${NC}"
+ exit 1
+ fi
+done
+
+# 检查新增依赖
+echo -e "\n${BLUE}3. 检查新增依赖${NC}"
+if grep -q "Pillow" requirements.txt; then
+ echo -e "${GREEN}✅ Pillow依赖已添加${NC}"
+else
+ echo -e "${RED}❌ Pillow依赖缺失${NC}"
+ exit 1
+fi
+
+# 停止现有容器
+echo -e "\n${BLUE}4. 停止现有容器${NC}"
+docker-compose down
+echo -e "${GREEN}✅ 容器已停止${NC}"
+
+# 构建镜像
+echo -e "\n${BLUE}5. 构建Docker镜像${NC}"
+echo "这可能需要几分钟时间..."
+if docker-compose build --no-cache; then
+ echo -e "${GREEN}✅ 镜像构建成功${NC}"
+else
+ echo -e "${RED}❌ 镜像构建失败${NC}"
+ exit 1
+fi
+
+# 启动服务
+echo -e "\n${BLUE}6. 启动服务${NC}"
+if docker-compose up -d; then
+ echo -e "${GREEN}✅ 服务启动成功${NC}"
+else
+ echo -e "${RED}❌ 服务启动失败${NC}"
+ exit 1
+fi
+
+# 等待服务就绪
+echo -e "\n${BLUE}7. 等待服务就绪${NC}"
+echo "等待30秒让服务完全启动..."
+sleep 30
+
+# 检查容器状态
+echo -e "\n${BLUE}8. 检查容器状态${NC}"
+if docker-compose ps | grep -q "Up"; then
+ echo -e "${GREEN}✅ 容器运行正常${NC}"
+ docker-compose ps
+else
+ echo -e "${RED}❌ 容器运行异常${NC}"
+ docker-compose ps
+ echo -e "\n${YELLOW}查看日志:${NC}"
+ docker-compose logs --tail=20
+ exit 1
+fi
+
+# 健康检查
+echo -e "\n${BLUE}9. 健康检查${NC}"
+max_attempts=10
+attempt=1
+
+while [ $attempt -le $max_attempts ]; do
+ if curl -s http://localhost:8080/health > /dev/null; then
+ echo -e "${GREEN}✅ 健康检查通过${NC}"
+ break
+ else
+ echo -e "${YELLOW}⏳ 尝试 $attempt/$max_attempts - 等待服务响应...${NC}"
+ sleep 3
+ ((attempt++))
+ fi
+done
+
+if [ $attempt -gt $max_attempts ]; then
+ echo -e "${RED}❌ 健康检查失败${NC}"
+ echo -e "\n${YELLOW}查看日志:${NC}"
+ docker-compose logs --tail=20
+ exit 1
+fi
+
+# 测试图形验证码API
+echo -e "\n${BLUE}10. 测试图形验证码功能${NC}"
+response=$(curl -s -X POST http://localhost:8080/generate-captcha \
+ -H "Content-Type: application/json" \
+ -d '{"session_id": "test_session"}')
+
+if echo "$response" | grep -q '"success":true'; then
+ echo -e "${GREEN}✅ 图形验证码API正常${NC}"
+else
+ echo -e "${RED}❌ 图形验证码API异常${NC}"
+ echo "响应: $response"
+fi
+
+# 测试注册页面
+echo -e "\n${BLUE}11. 测试注册页面${NC}"
+if curl -s http://localhost:8080/register.html | grep -q "用户注册"; then
+ echo -e "${GREEN}✅ 注册页面可访问${NC}"
+else
+ echo -e "${RED}❌ 注册页面访问失败${NC}"
+fi
+
+# 测试登录页面
+echo -e "\n${BLUE}12. 测试登录页面${NC}"
+if curl -s http://localhost:8080/login.html | grep -q "登录"; then
+ echo -e "${GREEN}✅ 登录页面可访问${NC}"
+else
+ echo -e "${RED}❌ 登录页面访问失败${NC}"
+fi
+
+# 检查Pillow安装
+echo -e "\n${BLUE}13. 检查Pillow安装${NC}"
+if docker-compose exec -T xianyu-app python -c "from PIL import Image; print('Pillow OK')" 2>/dev/null | grep -q "Pillow OK"; then
+ echo -e "${GREEN}✅ Pillow安装正常${NC}"
+else
+ echo -e "${RED}❌ Pillow安装异常${NC}"
+fi
+
+# 检查字体支持
+echo -e "\n${BLUE}14. 检查字体支持${NC}"
+if docker-compose exec -T xianyu-app ls /usr/share/fonts/ 2>/dev/null | grep -q "dejavu"; then
+ echo -e "${GREEN}✅ 字体支持正常${NC}"
+else
+ echo -e "${YELLOW}⚠️ 字体支持可能有问题${NC}"
+fi
+
+# 显示访问信息
+echo -e "\n${GREEN}🎉 Docker部署测试完成!${NC}"
+echo "=================================="
+echo -e "${BLUE}访问信息:${NC}"
+echo "• 主页: http://localhost:8080"
+echo "• 登录页面: http://localhost:8080/login.html"
+echo "• 注册页面: http://localhost:8080/register.html"
+echo "• 健康检查: http://localhost:8080/health"
+echo ""
+echo -e "${BLUE}默认管理员账号:${NC}"
+echo "• 用户名: admin"
+echo "• 密码: admin123"
+echo ""
+echo -e "${BLUE}多用户功能:${NC}"
+echo "• ✅ 用户注册"
+echo "• ✅ 图形验证码"
+echo "• ✅ 邮箱验证"
+echo "• ✅ 数据隔离"
+echo ""
+echo -e "${YELLOW}管理命令:${NC}"
+echo "• 查看日志: docker-compose logs -f"
+echo "• 停止服务: docker-compose down"
+echo "• 重启服务: docker-compose restart"
+echo "• 查看状态: docker-compose ps"
+
+# 可选:显示资源使用情况
+echo -e "\n${BLUE}资源使用情况:${NC}"
+docker stats --no-stream --format "table {{.Container}}\t{{.CPUPerc}}\t{{.MemUsage}}" | grep xianyu || echo "无法获取资源使用情况"
+
+echo -e "\n${GREEN}部署测试完成!系统已就绪。${NC}"
diff --git a/test_improvements.py b/test_improvements.py
new file mode 100644
index 0000000..b60429b
--- /dev/null
+++ b/test_improvements.py
@@ -0,0 +1,257 @@
+#!/usr/bin/env python3
+"""
+测试系统改进功能
+"""
+
+import requests
+import time
+from loguru import logger
+
+BASE_URL = "http://localhost:8080"
+
+def test_admin_login():
+ """测试管理员登录"""
+ logger.info("测试管理员登录...")
+
+ response = requests.post(f"{BASE_URL}/login",
+ json={'username': 'admin', 'password': 'admin123'})
+
+ if response.json()['success']:
+ token = response.json()['token']
+ user_id = response.json()['user_id']
+
+ logger.info(f"管理员登录成功,token: {token[:20]}...")
+ return {
+ 'token': token,
+ 'user_id': user_id,
+ 'headers': {'Authorization': f'Bearer {token}'}
+ }
+ else:
+ logger.error("管理员登录失败")
+ return None
+
+def test_page_access():
+ """测试页面访问"""
+ logger.info("测试页面访问...")
+
+ pages = [
+ ("用户管理", "/user_management.html"),
+ ("日志管理", "/log_management.html"),
+ ("数据管理", "/data_management.html")
+ ]
+
+ results = []
+ for page_name, page_url in pages:
+ try:
+ response = requests.get(f"{BASE_URL}{page_url}", timeout=5)
+
+ if response.status_code == 200:
+ logger.info(f"✅ {page_name} 页面访问成功 (200)")
+ results.append((page_name, True))
+ else:
+ logger.error(f"❌ {page_name} 页面访问失败 ({response.status_code})")
+ results.append((page_name, False))
+
+ except Exception as e:
+ logger.error(f"❌ {page_name} 页面访问异常: {e}")
+ results.append((page_name, False))
+
+ return results
+
+def test_data_management_api(admin):
+ """测试数据管理API"""
+ logger.info("测试数据管理API...")
+
+ # 测试获取表数据
+ tables_to_test = ['users', 'cookies', 'cards']
+
+ for table in tables_to_test:
+ try:
+ response = requests.get(f"{BASE_URL}/admin/data/{table}",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ data = response.json()
+ if data['success']:
+ logger.info(f"✅ 获取 {table} 表数据成功,共 {data['count']} 条记录")
+ else:
+ logger.error(f"❌ 获取 {table} 表数据失败: {data.get('message', '未知错误')}")
+ return False
+ else:
+ logger.error(f"❌ 获取 {table} 表数据失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ logger.error(f"❌ 获取 {table} 表数据异常: {e}")
+ return False
+
+ return True
+
+def test_log_management_api(admin):
+ """测试日志管理API"""
+ logger.info("测试日志管理API...")
+
+ try:
+ # 测试获取日志
+ response = requests.get(f"{BASE_URL}/admin/logs?lines=10",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ data = response.json()
+ logs = data.get('logs', [])
+ logger.info(f"✅ 获取系统日志成功,共 {len(logs)} 条")
+
+ # 测试日志级别过滤
+ response = requests.get(f"{BASE_URL}/admin/logs?lines=5&level=info",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ data = response.json()
+ info_logs = data.get('logs', [])
+ logger.info(f"✅ INFO级别日志过滤成功,共 {len(info_logs)} 条")
+ return True
+ else:
+ logger.error(f"❌ 日志级别过滤失败: {response.status_code}")
+ return False
+ else:
+ logger.error(f"❌ 获取系统日志失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ logger.error(f"❌ 日志管理API测试异常: {e}")
+ return False
+
+def test_database_backup_api(admin):
+ """测试数据库备份API"""
+ logger.info("测试数据库备份API...")
+
+ try:
+ # 测试数据库下载
+ response = requests.get(f"{BASE_URL}/admin/backup/download",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ logger.info(f"✅ 数据库备份下载成功,文件大小: {len(response.content)} bytes")
+
+ # 测试备份文件列表
+ response = requests.get(f"{BASE_URL}/admin/backup/list",
+ headers=admin['headers'])
+
+ if response.status_code == 200:
+ data = response.json()
+ backups = data.get('backups', [])
+ logger.info(f"✅ 获取备份文件列表成功,共 {len(backups)} 个备份")
+ return True
+ else:
+ logger.error(f"❌ 获取备份文件列表失败: {response.status_code}")
+ return False
+ else:
+ logger.error(f"❌ 数据库备份下载失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ logger.error(f"❌ 数据库备份API测试异常: {e}")
+ return False
+
+def main():
+ """主测试函数"""
+ print("🚀 系统改进功能测试")
+ print("=" * 60)
+
+ print("📋 测试内容:")
+ print("• 页面访问测试")
+ print("• 日志管理功能")
+ print("• 数据管理功能")
+ print("• 数据库备份功能")
+
+ try:
+ # 管理员登录
+ admin = test_admin_login()
+ if not admin:
+ print("❌ 测试失败:管理员登录失败")
+ return False
+
+ print("✅ 管理员登录成功")
+
+ # 测试页面访问
+ print(f"\n🧪 测试页面访问...")
+ page_results = test_page_access()
+
+ page_success = all(result[1] for result in page_results)
+ for page_name, success in page_results:
+ status = "✅ 通过" if success else "❌ 失败"
+ print(f" {page_name}: {status}")
+
+ # 测试API功能
+ tests = [
+ ("数据管理API", lambda: test_data_management_api(admin)),
+ ("日志管理API", lambda: test_log_management_api(admin)),
+ ("数据库备份API", lambda: test_database_backup_api(admin)),
+ ]
+
+ api_results = []
+ for test_name, test_func in tests:
+ print(f"\n🧪 测试 {test_name}...")
+ try:
+ result = test_func()
+ api_results.append((test_name, result))
+ if result:
+ print(f"✅ {test_name} 测试通过")
+ else:
+ print(f"❌ {test_name} 测试失败")
+ except Exception as e:
+ print(f"💥 {test_name} 测试异常: {e}")
+ api_results.append((test_name, False))
+
+ print("\n" + "=" * 60)
+ print("🎉 系统改进功能测试完成!")
+
+ print("\n📊 测试结果:")
+
+ # 页面访问结果
+ print("页面访问:")
+ for page_name, success in page_results:
+ status = "✅ 通过" if success else "❌ 失败"
+ print(f" {page_name}: {status}")
+
+ # API功能结果
+ print("API功能:")
+ for test_name, success in api_results:
+ status = "✅ 通过" if success else "❌ 失败"
+ print(f" {test_name}: {status}")
+
+ # 总体结果
+ all_results = page_results + api_results
+ passed = sum(1 for _, success in all_results if success)
+ total = len(all_results)
+
+ print(f"\n📈 总体结果: {passed}/{total} 项测试通过")
+
+ if passed == total:
+ print("🎊 所有测试都通过了!系统改进功能正常工作。")
+
+ print("\n💡 功能说明:")
+ print("1. 日志管理:最新日志显示在最上面")
+ print("2. 系统设置:只保留数据库备份模式")
+ print("3. 数据管理:新增管理员专用的数据表管理功能")
+ print("4. 所有管理员功能都有严格的权限控制")
+
+ print("\n🎯 使用方法:")
+ print("• 使用admin账号登录系统")
+ print("• 在侧边栏可以看到管理员功能菜单")
+ print("• 点击相应功能进入管理页面")
+ print("• 数据管理可以查看和删除表中的数据")
+
+ return True
+ else:
+ print("⚠️ 部分测试失败,请检查相关功能。")
+ return False
+
+ except Exception as e:
+ print(f"💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()
diff --git a/test_multiuser_system.py b/test_multiuser_system.py
new file mode 100644
index 0000000..0c841ce
--- /dev/null
+++ b/test_multiuser_system.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+"""
+多用户系统功能测试
+"""
+
+import asyncio
+import json
+import time
+from db_manager import db_manager
+
+async def test_user_registration():
+ """测试用户注册功能"""
+ print("🧪 测试用户注册功能")
+ print("-" * 40)
+
+ # 测试邮箱验证码生成
+ print("1️⃣ 测试验证码生成...")
+ code = db_manager.generate_verification_code()
+ print(f" 生成的验证码: {code}")
+ assert len(code) == 6 and code.isdigit(), "验证码应该是6位数字"
+ print(" ✅ 验证码生成正常")
+
+ # 测试保存验证码
+ print("\n2️⃣ 测试验证码保存...")
+ test_email = "test@example.com"
+ success = db_manager.save_verification_code(test_email, code)
+ assert success, "验证码保存应该成功"
+ print(" ✅ 验证码保存成功")
+
+ # 测试验证码验证
+ print("\n3️⃣ 测试验证码验证...")
+ valid = db_manager.verify_email_code(test_email, code)
+ assert valid, "正确的验证码应该验证成功"
+ print(" ✅ 验证码验证成功")
+
+ # 测试验证码重复使用
+ print("\n4️⃣ 测试验证码重复使用...")
+ valid_again = db_manager.verify_email_code(test_email, code)
+ assert not valid_again, "已使用的验证码不应该再次验证成功"
+ print(" ✅ 验证码重复使用被正确阻止")
+
+ # 测试用户创建
+ print("\n5️⃣ 测试用户创建...")
+ test_username = "testuser"
+ test_password = "testpass123"
+
+ # 先清理可能存在的测试用户
+ try:
+ db_manager.conn.execute("DELETE FROM users WHERE username = ? OR email = ?", (test_username, test_email))
+ db_manager.conn.commit()
+ except:
+ pass
+
+ success = db_manager.create_user(test_username, test_email, test_password)
+ assert success, "用户创建应该成功"
+ print(" ✅ 用户创建成功")
+
+ # 测试重复用户名
+ print("\n6️⃣ 测试重复用户名...")
+ success = db_manager.create_user(test_username, "another@example.com", test_password)
+ assert not success, "重复用户名应该创建失败"
+ print(" ✅ 重复用户名被正确拒绝")
+
+ # 测试重复邮箱
+ print("\n7️⃣ 测试重复邮箱...")
+ success = db_manager.create_user("anotheruser", test_email, test_password)
+ assert not success, "重复邮箱应该创建失败"
+ print(" ✅ 重复邮箱被正确拒绝")
+
+ # 测试用户查询
+ print("\n8️⃣ 测试用户查询...")
+ user = db_manager.get_user_by_username(test_username)
+ assert user is not None, "应该能查询到创建的用户"
+ assert user['username'] == test_username, "用户名应该匹配"
+ assert user['email'] == test_email, "邮箱应该匹配"
+ print(" ✅ 用户查询成功")
+
+ # 测试密码验证
+ print("\n9️⃣ 测试密码验证...")
+ valid = db_manager.verify_user_password(test_username, test_password)
+ assert valid, "正确密码应该验证成功"
+
+ invalid = db_manager.verify_user_password(test_username, "wrongpassword")
+ assert not invalid, "错误密码应该验证失败"
+ print(" ✅ 密码验证正常")
+
+ # 清理测试数据
+ print("\n🧹 清理测试数据...")
+ db_manager.conn.execute("DELETE FROM users WHERE username = ?", (test_username,))
+ db_manager.conn.execute("DELETE FROM email_verifications WHERE email = ?", (test_email,))
+ db_manager.conn.commit()
+ print(" ✅ 测试数据清理完成")
+
+def test_user_isolation():
+ """测试用户数据隔离"""
+ print("\n🧪 测试用户数据隔离")
+ print("-" * 40)
+
+ # 创建测试用户
+ print("1️⃣ 创建测试用户...")
+ user1_name = "testuser1"
+ user2_name = "testuser2"
+ user1_email = "user1@test.com"
+ user2_email = "user2@test.com"
+ password = "testpass123"
+
+ # 清理可能存在的测试数据
+ try:
+ db_manager.conn.execute("DELETE FROM cookies WHERE id LIKE 'test_%'")
+ db_manager.conn.execute("DELETE FROM users WHERE username IN (?, ?)", (user1_name, user2_name))
+ db_manager.conn.commit()
+ except:
+ pass
+
+ # 创建用户
+ success1 = db_manager.create_user(user1_name, user1_email, password)
+ success2 = db_manager.create_user(user2_name, user2_email, password)
+ assert success1 and success2, "用户创建应该成功"
+
+ user1 = db_manager.get_user_by_username(user1_name)
+ user2 = db_manager.get_user_by_username(user2_name)
+ user1_id = user1['id']
+ user2_id = user2['id']
+ print(f" ✅ 用户创建成功: {user1_name}(ID:{user1_id}), {user2_name}(ID:{user2_id})")
+
+ # 测试Cookie隔离
+ print("\n2️⃣ 测试Cookie数据隔离...")
+
+ # 用户1添加cookies
+ db_manager.save_cookie("test_cookie_1", "cookie_value_1", user1_id)
+ db_manager.save_cookie("test_cookie_2", "cookie_value_2", user1_id)
+
+ # 用户2添加cookies
+ db_manager.save_cookie("test_cookie_3", "cookie_value_3", user2_id)
+ db_manager.save_cookie("test_cookie_4", "cookie_value_4", user2_id)
+
+ # 验证用户1只能看到自己的cookies
+ user1_cookies = db_manager.get_all_cookies(user1_id)
+ user1_cookie_ids = set(user1_cookies.keys())
+ expected_user1_cookies = {"test_cookie_1", "test_cookie_2"}
+
+ assert expected_user1_cookies.issubset(user1_cookie_ids), f"用户1应该能看到自己的cookies: {expected_user1_cookies}"
+ assert "test_cookie_3" not in user1_cookie_ids, "用户1不应该看到用户2的cookies"
+ assert "test_cookie_4" not in user1_cookie_ids, "用户1不应该看到用户2的cookies"
+ print(" ✅ 用户1的Cookie隔离正常")
+
+ # 验证用户2只能看到自己的cookies
+ user2_cookies = db_manager.get_all_cookies(user2_id)
+ user2_cookie_ids = set(user2_cookies.keys())
+ expected_user2_cookies = {"test_cookie_3", "test_cookie_4"}
+
+ assert expected_user2_cookies.issubset(user2_cookie_ids), f"用户2应该能看到自己的cookies: {expected_user2_cookies}"
+ assert "test_cookie_1" not in user2_cookie_ids, "用户2不应该看到用户1的cookies"
+ assert "test_cookie_2" not in user2_cookie_ids, "用户2不应该看到用户1的cookies"
+ print(" ✅ 用户2的Cookie隔离正常")
+
+ # 测试关键字隔离
+ print("\n3️⃣ 测试关键字数据隔离...")
+
+ # 添加关键字
+ user1_keywords = [("hello", "user1 reply"), ("price", "user1 price")]
+ user2_keywords = [("hello", "user2 reply"), ("info", "user2 info")]
+
+ db_manager.save_keywords("test_cookie_1", user1_keywords)
+ db_manager.save_keywords("test_cookie_3", user2_keywords)
+
+ # 验证关键字隔离
+ user1_all_keywords = db_manager.get_all_keywords(user1_id)
+ user2_all_keywords = db_manager.get_all_keywords(user2_id)
+
+ assert "test_cookie_1" in user1_all_keywords, "用户1应该能看到自己的关键字"
+ assert "test_cookie_3" not in user1_all_keywords, "用户1不应该看到用户2的关键字"
+
+ assert "test_cookie_3" in user2_all_keywords, "用户2应该能看到自己的关键字"
+ assert "test_cookie_1" not in user2_all_keywords, "用户2不应该看到用户1的关键字"
+ print(" ✅ 关键字数据隔离正常")
+
+ # 测试备份隔离
+ print("\n4️⃣ 测试备份数据隔离...")
+
+ # 用户1备份
+ user1_backup = db_manager.export_backup(user1_id)
+ user1_backup_cookies = [row[0] for row in user1_backup['data']['cookies']['rows']]
+
+ assert "test_cookie_1" in user1_backup_cookies, "用户1备份应该包含自己的cookies"
+ assert "test_cookie_2" in user1_backup_cookies, "用户1备份应该包含自己的cookies"
+ assert "test_cookie_3" not in user1_backup_cookies, "用户1备份不应该包含其他用户的cookies"
+ assert "test_cookie_4" not in user1_backup_cookies, "用户1备份不应该包含其他用户的cookies"
+ print(" ✅ 用户1备份隔离正常")
+
+ # 用户2备份
+ user2_backup = db_manager.export_backup(user2_id)
+ user2_backup_cookies = [row[0] for row in user2_backup['data']['cookies']['rows']]
+
+ assert "test_cookie_3" in user2_backup_cookies, "用户2备份应该包含自己的cookies"
+ assert "test_cookie_4" in user2_backup_cookies, "用户2备份应该包含自己的cookies"
+ assert "test_cookie_1" not in user2_backup_cookies, "用户2备份不应该包含其他用户的cookies"
+ assert "test_cookie_2" not in user2_backup_cookies, "用户2备份不应该包含其他用户的cookies"
+ print(" ✅ 用户2备份隔离正常")
+
+ # 清理测试数据
+ print("\n🧹 清理测试数据...")
+ db_manager.conn.execute("DELETE FROM keywords WHERE cookie_id LIKE 'test_%'")
+ db_manager.conn.execute("DELETE FROM cookies WHERE id LIKE 'test_%'")
+ db_manager.conn.execute("DELETE FROM users WHERE username IN (?, ?)", (user1_name, user2_name))
+ db_manager.conn.commit()
+ print(" ✅ 测试数据清理完成")
+
+async def test_email_sending():
+ """测试邮件发送功能(模拟)"""
+ print("\n🧪 测试邮件发送功能")
+ print("-" * 40)
+
+ print("📧 邮件发送功能测试(需要网络连接)")
+ print(" 注意:这将发送真实的邮件,请确保邮箱地址正确")
+
+ test_email = input("请输入测试邮箱地址(回车跳过): ").strip()
+
+ if test_email:
+ print(f" 正在发送测试邮件到: {test_email}")
+ code = db_manager.generate_verification_code()
+
+ try:
+ success = await db_manager.send_verification_email(test_email, code)
+ if success:
+ print(" ✅ 邮件发送成功!请检查邮箱")
+ else:
+ print(" ❌ 邮件发送失败")
+ except Exception as e:
+ print(f" ❌ 邮件发送异常: {e}")
+ else:
+ print(" ⏭️ 跳过邮件发送测试")
+
+async def main():
+ """主测试函数"""
+ print("🚀 多用户系统功能测试")
+ print("=" * 60)
+
+ try:
+ # 测试用户注册功能
+ await test_user_registration()
+
+ # 测试用户数据隔离
+ test_user_isolation()
+
+ # 测试邮件发送(可选)
+ await test_email_sending()
+
+ print("\n" + "=" * 60)
+ print("🎉 所有测试通过!多用户系统功能正常")
+
+ print("\n📋 测试总结:")
+ print("✅ 用户注册功能正常")
+ print("✅ 邮箱验证码功能正常")
+ print("✅ 用户数据隔离正常")
+ print("✅ Cookie数据隔离正常")
+ print("✅ 关键字数据隔离正常")
+ print("✅ 备份数据隔离正常")
+
+ print("\n💡 下一步:")
+ print("1. 运行迁移脚本: python migrate_to_multiuser.py")
+ print("2. 重启应用程序")
+ print("3. 访问 /register.html 测试用户注册")
+ print("4. 测试多用户登录和数据隔离")
+
+ except AssertionError as e:
+ print(f"\n❌ 测试失败: {e}")
+ return False
+ except Exception as e:
+ print(f"\n💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+ return True
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/test_page_access.py b/test_page_access.py
new file mode 100644
index 0000000..f76bdae
--- /dev/null
+++ b/test_page_access.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python3
+"""
+测试页面访问
+"""
+
+import requests
+import time
+
+BASE_URL = "http://localhost:8080"
+
+def test_page_access():
+ """测试页面访问"""
+ print("🚀 测试管理员页面访问")
+ print("=" * 50)
+
+ pages = [
+ ("主页", "/"),
+ ("登录页", "/login.html"),
+ ("注册页", "/register.html"),
+ ("管理页", "/admin"),
+ ("用户管理", "/user_management.html"),
+ ("日志管理", "/log_management.html")
+ ]
+
+ for page_name, page_url in pages:
+ try:
+ print(f"测试 {page_name} ({page_url})...", end=" ")
+ response = requests.get(f"{BASE_URL}{page_url}", timeout=5)
+
+ if response.status_code == 200:
+ print(f"✅ {response.status_code}")
+ else:
+ print(f"❌ {response.status_code}")
+
+ except requests.exceptions.ConnectionError:
+ print("❌ 连接失败")
+ except requests.exceptions.Timeout:
+ print("❌ 超时")
+ except Exception as e:
+ print(f"❌ 错误: {e}")
+
+ print("\n" + "=" * 50)
+ print("测试完成!")
+
+if __name__ == "__main__":
+ test_page_access()
diff --git a/test_token_fix.py b/test_token_fix.py
new file mode 100644
index 0000000..3c76623
--- /dev/null
+++ b/test_token_fix.py
@@ -0,0 +1,146 @@
+#!/usr/bin/env python3
+"""
+测试token修复
+"""
+
+import requests
+import json
+
+BASE_URL = "http://localhost:8080"
+
+def test_admin_api_access():
+ """测试管理员API访问"""
+ print("🚀 测试管理员API访问")
+ print("=" * 50)
+
+ # 1. 管理员登录
+ print("1. 管理员登录...")
+ login_response = requests.post(f"{BASE_URL}/login",
+ json={'username': 'admin', 'password': 'admin123'})
+
+ if not login_response.json()['success']:
+ print("❌ 管理员登录失败")
+ return False
+
+ token = login_response.json()['token']
+ headers = {'Authorization': f'Bearer {token}'}
+ print(f"✅ 管理员登录成功,token: {token[:20]}...")
+
+ # 2. 测试管理员API
+ apis = [
+ ("获取用户列表", "GET", "/admin/users"),
+ ("获取系统统计", "GET", "/admin/stats"),
+ ("获取系统日志", "GET", "/admin/logs?lines=10")
+ ]
+
+ for api_name, method, endpoint in apis:
+ print(f"2. 测试 {api_name}...")
+
+ if method == "GET":
+ response = requests.get(f"{BASE_URL}{endpoint}", headers=headers)
+ else:
+ response = requests.post(f"{BASE_URL}{endpoint}", headers=headers)
+
+ if response.status_code == 200:
+ print(f" ✅ {api_name} 成功 (200)")
+
+ # 显示部分数据
+ try:
+ data = response.json()
+ if endpoint == "/admin/users":
+ users = data.get('users', [])
+ print(f" 用户数量: {len(users)}")
+ elif endpoint == "/admin/stats":
+ print(f" 总用户数: {data.get('users', {}).get('total', 0)}")
+ print(f" 总Cookie数: {data.get('cookies', {}).get('total', 0)}")
+ elif endpoint.startswith("/admin/logs"):
+ logs = data.get('logs', [])
+ print(f" 日志条数: {len(logs)}")
+ except:
+ pass
+
+ elif response.status_code == 401:
+ print(f" ❌ {api_name} 失败 - 401 未授权")
+ return False
+ elif response.status_code == 403:
+ print(f" ❌ {api_name} 失败 - 403 权限不足")
+ return False
+ else:
+ print(f" ❌ {api_name} 失败 - {response.status_code}")
+ return False
+
+ print("\n✅ 所有管理员API测试通过!")
+ return True
+
+def test_non_admin_access():
+ """测试非管理员访问"""
+ print("\n🔒 测试非管理员访问限制")
+ print("=" * 50)
+
+ # 使用无效token测试
+ fake_headers = {'Authorization': 'Bearer invalid_token'}
+
+ response = requests.get(f"{BASE_URL}/admin/users", headers=fake_headers)
+ if response.status_code == 401:
+ print("✅ 无效token被正确拒绝 (401)")
+ else:
+ print(f"❌ 无效token未被拒绝 ({response.status_code})")
+ return False
+
+ # 测试无token访问
+ response = requests.get(f"{BASE_URL}/admin/users")
+ if response.status_code == 401:
+ print("✅ 无token访问被正确拒绝 (401)")
+ else:
+ print(f"❌ 无token访问未被拒绝 ({response.status_code})")
+ return False
+
+ return True
+
+def main():
+ """主测试函数"""
+ print("🔧 Token修复验证测试")
+ print("=" * 60)
+
+ print("📋 测试目标:")
+ print("• 验证管理员可以正常访问API")
+ print("• 验证token认证正常工作")
+ print("• 验证非管理员访问被拒绝")
+
+ try:
+ # 测试管理员API访问
+ admin_success = test_admin_api_access()
+
+ # 测试非管理员访问限制
+ security_success = test_non_admin_access()
+
+ print("\n" + "=" * 60)
+
+ if admin_success and security_success:
+ print("🎉 Token修复验证成功!")
+
+ print("\n💡 现在可以正常使用:")
+ print("1. 使用admin账号登录主页")
+ print("2. 点击侧边栏的'用户管理'")
+ print("3. 点击侧边栏的'系统日志'")
+ print("4. 所有管理员功能都应该正常工作")
+
+ print("\n🔑 Token存储统一:")
+ print("• 登录页面: 设置 'auth_token'")
+ print("• 主页面: 读取 'auth_token'")
+ print("• 管理员页面: 读取 'auth_token'")
+ print("• 所有页面现在使用统一的token key")
+
+ return True
+ else:
+ print("❌ Token修复验证失败!")
+ return False
+
+ except Exception as e:
+ print(f"💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()
diff --git a/test_user_isolation_complete.py b/test_user_isolation_complete.py
new file mode 100644
index 0000000..a5b3e98
--- /dev/null
+++ b/test_user_isolation_complete.py
@@ -0,0 +1,333 @@
+#!/usr/bin/env python3
+"""
+完整的多用户数据隔离测试
+"""
+
+import requests
+import json
+import sqlite3
+import time
+
+BASE_URL = "http://localhost:8080"
+
+def create_test_users():
+ """创建测试用户"""
+ print("🧪 创建测试用户")
+ print("-" * 40)
+
+ users = [
+ {"username": "testuser1", "email": "user1@test.com", "password": "test123456"},
+ {"username": "testuser2", "email": "user2@test.com", "password": "test123456"}
+ ]
+
+ created_users = []
+
+ for user in users:
+ try:
+ # 清理可能存在的用户
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('DELETE FROM users WHERE username = ? OR email = ?', (user['username'], user['email']))
+ cursor.execute('DELETE FROM email_verifications WHERE email = ?', (user['email'],))
+ conn.commit()
+ conn.close()
+
+ # 生成验证码
+ session_id = f"test_{user['username']}_{int(time.time())}"
+
+ # 生成图形验证码
+ captcha_response = requests.post(f"{BASE_URL}/generate-captcha",
+ json={'session_id': session_id})
+ if not captcha_response.json()['success']:
+ print(f" ❌ {user['username']}: 图形验证码生成失败")
+ continue
+
+ # 获取图形验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM captcha_codes WHERE session_id = ? ORDER BY created_at DESC LIMIT 1',
+ (session_id,))
+ captcha_result = cursor.fetchone()
+ conn.close()
+
+ if not captcha_result:
+ print(f" ❌ {user['username']}: 无法获取图形验证码")
+ continue
+
+ captcha_code = captcha_result[0]
+
+ # 验证图形验证码
+ verify_response = requests.post(f"{BASE_URL}/verify-captcha",
+ json={'session_id': session_id, 'captcha_code': captcha_code})
+ if not verify_response.json()['success']:
+ print(f" ❌ {user['username']}: 图形验证码验证失败")
+ continue
+
+ # 发送邮箱验证码
+ email_response = requests.post(f"{BASE_URL}/send-verification-code",
+ json={'email': user['email'], 'session_id': session_id})
+ if not email_response.json()['success']:
+ print(f" ❌ {user['username']}: 邮箱验证码发送失败")
+ continue
+
+ # 获取邮箱验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM email_verifications WHERE email = ? ORDER BY created_at DESC LIMIT 1',
+ (user['email'],))
+ email_result = cursor.fetchone()
+ conn.close()
+
+ if not email_result:
+ print(f" ❌ {user['username']}: 无法获取邮箱验证码")
+ continue
+
+ email_code = email_result[0]
+
+ # 注册用户
+ register_response = requests.post(f"{BASE_URL}/register",
+ json={
+ 'username': user['username'],
+ 'email': user['email'],
+ 'verification_code': email_code,
+ 'password': user['password']
+ })
+
+ if register_response.json()['success']:
+ print(f" ✅ {user['username']}: 注册成功")
+
+ # 登录获取token
+ login_response = requests.post(f"{BASE_URL}/login",
+ json={'username': user['username'], 'password': user['password']})
+
+ if login_response.json()['success']:
+ token = login_response.json()['token']
+ user_id = login_response.json()['user_id']
+ created_users.append({
+ 'username': user['username'],
+ 'user_id': user_id,
+ 'token': token,
+ 'headers': {'Authorization': f'Bearer {token}'}
+ })
+ print(f" ✅ {user['username']}: 登录成功,用户ID: {user_id}")
+ else:
+ print(f" ❌ {user['username']}: 登录失败")
+ else:
+ print(f" ❌ {user['username']}: 注册失败 - {register_response.json()['message']}")
+
+ except Exception as e:
+ print(f" ❌ {user['username']}: 创建失败 - {e}")
+
+ return created_users
+
+def test_cookie_isolation(users):
+ """测试Cookie数据隔离"""
+ print("\n🧪 测试Cookie数据隔离")
+ print("-" * 40)
+
+ if len(users) < 2:
+ print("❌ 需要至少2个用户进行隔离测试")
+ return False
+
+ user1, user2 = users[0], users[1]
+
+ # 用户1添加cookies
+ print(f"1️⃣ {user1['username']} 添加cookies...")
+ cookies1 = [
+ {"id": "test_cookie_user1_1", "value": "cookie_value_1"},
+ {"id": "test_cookie_user1_2", "value": "cookie_value_2"}
+ ]
+
+ for cookie in cookies1:
+ response = requests.post(f"{BASE_URL}/cookies",
+ json=cookie,
+ headers=user1['headers'])
+ if response.status_code == 200:
+ print(f" ✅ 添加cookie: {cookie['id']}")
+ else:
+ print(f" ❌ 添加cookie失败: {cookie['id']}")
+
+ # 用户2添加cookies
+ print(f"\n2️⃣ {user2['username']} 添加cookies...")
+ cookies2 = [
+ {"id": "test_cookie_user2_1", "value": "cookie_value_3"},
+ {"id": "test_cookie_user2_2", "value": "cookie_value_4"}
+ ]
+
+ for cookie in cookies2:
+ response = requests.post(f"{BASE_URL}/cookies",
+ json=cookie,
+ headers=user2['headers'])
+ if response.status_code == 200:
+ print(f" ✅ 添加cookie: {cookie['id']}")
+ else:
+ print(f" ❌ 添加cookie失败: {cookie['id']}")
+
+ # 验证用户1只能看到自己的cookies
+ print(f"\n3️⃣ 验证 {user1['username']} 的cookie隔离...")
+ response1 = requests.get(f"{BASE_URL}/cookies", headers=user1['headers'])
+ if response1.status_code == 200:
+ user1_cookies = response1.json()
+ user1_cookie_ids = set(user1_cookies)
+ expected_user1 = {"test_cookie_user1_1", "test_cookie_user1_2"}
+
+ if expected_user1.issubset(user1_cookie_ids):
+ print(f" ✅ {user1['username']} 能看到自己的cookies")
+ else:
+ print(f" ❌ {user1['username']} 看不到自己的cookies")
+
+ if "test_cookie_user2_1" not in user1_cookie_ids and "test_cookie_user2_2" not in user1_cookie_ids:
+ print(f" ✅ {user1['username']} 看不到其他用户的cookies")
+ else:
+ print(f" ❌ {user1['username']} 能看到其他用户的cookies(隔离失败)")
+
+ # 验证用户2只能看到自己的cookies
+ print(f"\n4️⃣ 验证 {user2['username']} 的cookie隔离...")
+ response2 = requests.get(f"{BASE_URL}/cookies", headers=user2['headers'])
+ if response2.status_code == 200:
+ user2_cookies = response2.json()
+ user2_cookie_ids = set(user2_cookies)
+ expected_user2 = {"test_cookie_user2_1", "test_cookie_user2_2"}
+
+ if expected_user2.issubset(user2_cookie_ids):
+ print(f" ✅ {user2['username']} 能看到自己的cookies")
+ else:
+ print(f" ❌ {user2['username']} 看不到自己的cookies")
+
+ if "test_cookie_user1_1" not in user2_cookie_ids and "test_cookie_user1_2" not in user2_cookie_ids:
+ print(f" ✅ {user2['username']} 看不到其他用户的cookies")
+ else:
+ print(f" ❌ {user2['username']} 能看到其他用户的cookies(隔离失败)")
+
+ return True
+
+def test_cross_user_access(users):
+ """测试跨用户访问权限"""
+ print("\n🧪 测试跨用户访问权限")
+ print("-" * 40)
+
+ if len(users) < 2:
+ print("❌ 需要至少2个用户进行权限测试")
+ return False
+
+ user1, user2 = users[0], users[1]
+
+ # 用户1尝试访问用户2的cookie
+ print(f"1️⃣ {user1['username']} 尝试访问 {user2['username']} 的cookie...")
+
+ # 尝试获取用户2的关键字
+ response = requests.get(f"{BASE_URL}/keywords/test_cookie_user2_1", headers=user1['headers'])
+ if response.status_code == 403:
+ print(f" ✅ 跨用户访问被正确拒绝 (403)")
+ elif response.status_code == 404:
+ print(f" ✅ 跨用户访问被拒绝 (404)")
+ else:
+ print(f" ❌ 跨用户访问未被拒绝 (状态码: {response.status_code})")
+
+ # 尝试更新用户2的cookie状态
+ response = requests.put(f"{BASE_URL}/cookies/test_cookie_user2_1/status",
+ json={"enabled": False},
+ headers=user1['headers'])
+ if response.status_code == 403:
+ print(f" ✅ 跨用户操作被正确拒绝 (403)")
+ elif response.status_code == 404:
+ print(f" ✅ 跨用户操作被拒绝 (404)")
+ else:
+ print(f" ❌ 跨用户操作未被拒绝 (状态码: {response.status_code})")
+
+ return True
+
+def cleanup_test_data(users):
+ """清理测试数据"""
+ print("\n🧹 清理测试数据")
+ print("-" * 40)
+
+ try:
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+
+ # 清理测试cookies
+ cursor.execute('DELETE FROM cookies WHERE id LIKE "test_cookie_%"')
+ cookie_count = cursor.rowcount
+
+ # 清理测试用户
+ test_usernames = [user['username'] for user in users]
+ if test_usernames:
+ placeholders = ','.join(['?' for _ in test_usernames])
+ cursor.execute(f'DELETE FROM users WHERE username IN ({placeholders})', test_usernames)
+ user_count = cursor.rowcount
+ else:
+ user_count = 0
+
+ # 清理测试验证码
+ cursor.execute('DELETE FROM email_verifications WHERE email LIKE "%@test.com"')
+ email_count = cursor.rowcount
+
+ cursor.execute('DELETE FROM captcha_codes WHERE session_id LIKE "test_%"')
+ captcha_count = cursor.rowcount
+
+ conn.commit()
+ conn.close()
+
+ print(f"✅ 清理完成:")
+ print(f" • 测试cookies: {cookie_count} 条")
+ print(f" • 测试用户: {user_count} 条")
+ print(f" • 邮箱验证码: {email_count} 条")
+ print(f" • 图形验证码: {captcha_count} 条")
+
+ except Exception as e:
+ print(f"❌ 清理失败: {e}")
+
+def main():
+ """主测试函数"""
+ print("🚀 多用户数据隔离完整测试")
+ print("=" * 60)
+
+ try:
+ # 创建测试用户
+ users = create_test_users()
+
+ if len(users) < 2:
+ print("\n❌ 测试失败:无法创建足够的测试用户")
+ return False
+
+ print(f"\n✅ 成功创建 {len(users)} 个测试用户")
+
+ # 测试Cookie数据隔离
+ cookie_isolation_success = test_cookie_isolation(users)
+
+ # 测试跨用户访问权限
+ cross_access_success = test_cross_user_access(users)
+
+ # 清理测试数据
+ cleanup_test_data(users)
+
+ print("\n" + "=" * 60)
+ if cookie_isolation_success and cross_access_success:
+ print("🎉 多用户数据隔离测试全部通过!")
+
+ print("\n📋 测试总结:")
+ print("✅ 用户注册和登录功能正常")
+ print("✅ Cookie数据完全隔离")
+ print("✅ 跨用户访问被正确拒绝")
+ print("✅ 用户权限验证正常")
+
+ print("\n🔒 安全特性:")
+ print("• 每个用户只能看到自己的数据")
+ print("• 跨用户访问被严格禁止")
+ print("• API层面权限验证完整")
+ print("• 数据库层面用户绑定正确")
+
+ return True
+ else:
+ print("❌ 多用户数据隔离测试失败!")
+ return False
+
+ except Exception as e:
+ print(f"\n💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()
diff --git a/test_user_logging.py b/test_user_logging.py
new file mode 100644
index 0000000..cd030c3
--- /dev/null
+++ b/test_user_logging.py
@@ -0,0 +1,318 @@
+#!/usr/bin/env python3
+"""
+测试用户日志显示功能
+"""
+
+import requests
+import json
+import time
+import sqlite3
+from loguru import logger
+
+BASE_URL = "http://localhost:8080"
+
+def create_test_user():
+ """创建测试用户"""
+ logger.info("创建测试用户...")
+
+ user_data = {
+ "username": "logtest_user",
+ "email": "logtest@test.com",
+ "password": "test123456"
+ }
+
+ try:
+ # 清理可能存在的用户
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('DELETE FROM users WHERE username = ? OR email = ?', (user_data['username'], user_data['email']))
+ cursor.execute('DELETE FROM email_verifications WHERE email = ?', (user_data['email'],))
+ conn.commit()
+ conn.close()
+
+ # 生成验证码
+ session_id = f"logtest_{int(time.time())}"
+
+ # 生成图形验证码
+ captcha_response = requests.post(f"{BASE_URL}/generate-captcha",
+ json={'session_id': session_id})
+ if not captcha_response.json()['success']:
+ logger.error("图形验证码生成失败")
+ return None
+
+ # 获取图形验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM captcha_codes WHERE session_id = ? ORDER BY created_at DESC LIMIT 1',
+ (session_id,))
+ captcha_result = cursor.fetchone()
+ conn.close()
+
+ if not captcha_result:
+ logger.error("无法获取图形验证码")
+ return None
+
+ captcha_code = captcha_result[0]
+
+ # 验证图形验证码
+ verify_response = requests.post(f"{BASE_URL}/verify-captcha",
+ json={'session_id': session_id, 'captcha_code': captcha_code})
+ if not verify_response.json()['success']:
+ logger.error("图形验证码验证失败")
+ return None
+
+ # 发送邮箱验证码
+ email_response = requests.post(f"{BASE_URL}/send-verification-code",
+ json={'email': user_data['email'], 'session_id': session_id})
+ if not email_response.json()['success']:
+ logger.error("邮箱验证码发送失败")
+ return None
+
+ # 获取邮箱验证码
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+ cursor.execute('SELECT code FROM email_verifications WHERE email = ? ORDER BY created_at DESC LIMIT 1',
+ (user_data['email'],))
+ email_result = cursor.fetchone()
+ conn.close()
+
+ if not email_result:
+ logger.error("无法获取邮箱验证码")
+ return None
+
+ email_code = email_result[0]
+
+ # 注册用户
+ register_response = requests.post(f"{BASE_URL}/register",
+ json={
+ 'username': user_data['username'],
+ 'email': user_data['email'],
+ 'verification_code': email_code,
+ 'password': user_data['password']
+ })
+
+ if register_response.json()['success']:
+ logger.info(f"用户注册成功: {user_data['username']}")
+
+ # 登录获取token
+ login_response = requests.post(f"{BASE_URL}/login",
+ json={'username': user_data['username'], 'password': user_data['password']})
+
+ if login_response.json()['success']:
+ token = login_response.json()['token']
+ user_id = login_response.json()['user_id']
+ return {
+ 'username': user_data['username'],
+ 'user_id': user_id,
+ 'token': token,
+ 'headers': {'Authorization': f'Bearer {token}'}
+ }
+ else:
+ logger.error("用户登录失败")
+ return None
+ else:
+ logger.error(f"用户注册失败: {register_response.json()['message']}")
+ return None
+
+ except Exception as e:
+ logger.error(f"创建用户失败: {e}")
+ return None
+
+def test_user_operations(user):
+ """测试用户操作的日志显示"""
+ logger.info("测试用户操作的日志显示...")
+
+ print(f"\n🧪 开始测试用户 {user['username']} 的操作日志")
+ print("请观察服务器日志,应该显示用户信息...")
+ print("-" * 50)
+
+ # 1. 测试Cookie操作
+ print("1️⃣ 测试Cookie操作...")
+ cookie_data = {
+ "id": "logtest_cookie",
+ "value": "test_cookie_value"
+ }
+
+ response = requests.post(f"{BASE_URL}/cookies", json=cookie_data, headers=user['headers'])
+ if response.status_code == 200:
+ print(" ✅ Cookie添加成功")
+ else:
+ print(f" ❌ Cookie添加失败: {response.text}")
+
+ # 2. 测试获取Cookie列表
+ print("2️⃣ 测试获取Cookie列表...")
+ response = requests.get(f"{BASE_URL}/cookies", headers=user['headers'])
+ if response.status_code == 200:
+ print(" ✅ 获取Cookie列表成功")
+ else:
+ print(f" ❌ 获取Cookie列表失败: {response.text}")
+
+ # 3. 测试关键字操作
+ print("3️⃣ 测试关键字操作...")
+ keywords_data = {
+ "keywords": {
+ "你好": "您好,欢迎咨询!",
+ "价格": "价格请看商品详情"
+ }
+ }
+
+ response = requests.post(f"{BASE_URL}/keywords/logtest_cookie",
+ json=keywords_data, headers=user['headers'])
+ if response.status_code == 200:
+ print(" ✅ 关键字更新成功")
+ else:
+ print(f" ❌ 关键字更新失败: {response.text}")
+
+ # 4. 测试卡券操作
+ print("4️⃣ 测试卡券操作...")
+ card_data = {
+ "name": "测试卡券",
+ "type": "text",
+ "text_content": "这是一个测试卡券",
+ "description": "用于测试日志显示的卡券"
+ }
+
+ response = requests.post(f"{BASE_URL}/cards", json=card_data, headers=user['headers'])
+ if response.status_code == 200:
+ print(" ✅ 卡券创建成功")
+ else:
+ print(f" ❌ 卡券创建失败: {response.text}")
+
+ # 5. 测试用户设置
+ print("5️⃣ 测试用户设置...")
+ setting_data = {
+ "value": "#ff6600",
+ "description": "测试主题颜色"
+ }
+
+ response = requests.put(f"{BASE_URL}/user-settings/theme_color",
+ json=setting_data, headers=user['headers'])
+ if response.status_code == 200:
+ print(" ✅ 用户设置更新成功")
+ else:
+ print(f" ❌ 用户设置更新失败: {response.text}")
+
+ # 6. 测试权限验证(尝试访问不存在的Cookie)
+ print("6️⃣ 测试权限验证...")
+ response = requests.get(f"{BASE_URL}/keywords/nonexistent_cookie", headers=user['headers'])
+ if response.status_code == 403:
+ print(" ✅ 权限验证正常(403错误)")
+ else:
+ print(f" ⚠️ 权限验证结果: {response.status_code}")
+
+ print("-" * 50)
+ print("🎯 操作测试完成,请检查服务器日志中的用户信息显示")
+
+def test_admin_operations():
+ """测试管理员操作"""
+ print("\n🔧 测试管理员操作...")
+
+ # 管理员登录
+ admin_login = requests.post(f"{BASE_URL}/login",
+ json={'username': 'admin', 'password': 'admin123'})
+
+ if admin_login.json()['success']:
+ admin_token = admin_login.json()['token']
+ admin_headers = {'Authorization': f'Bearer {admin_token}'}
+
+ print(" ✅ 管理员登录成功")
+
+ # 测试管理员获取Cookie列表
+ response = requests.get(f"{BASE_URL}/cookies", headers=admin_headers)
+ if response.status_code == 200:
+ print(" ✅ 管理员获取Cookie列表成功")
+ else:
+ print(f" ❌ 管理员获取Cookie列表失败: {response.text}")
+ else:
+ print(" ❌ 管理员登录失败")
+
+def cleanup_test_data(user):
+ """清理测试数据"""
+ logger.info("清理测试数据...")
+
+ try:
+ conn = sqlite3.connect('xianyu_data.db')
+ cursor = conn.cursor()
+
+ # 清理测试用户
+ cursor.execute('DELETE FROM users WHERE username = ?', (user['username'],))
+ user_count = cursor.rowcount
+
+ # 清理测试Cookie
+ cursor.execute('DELETE FROM cookies WHERE id = "logtest_cookie"')
+ cookie_count = cursor.rowcount
+
+ # 清理测试卡券
+ cursor.execute('DELETE FROM cards WHERE name = "测试卡券"')
+ card_count = cursor.rowcount
+
+ # 清理测试验证码
+ cursor.execute('DELETE FROM email_verifications WHERE email = "logtest@test.com"')
+ email_count = cursor.rowcount
+
+ cursor.execute('DELETE FROM captcha_codes WHERE session_id LIKE "logtest_%"')
+ captcha_count = cursor.rowcount
+
+ conn.commit()
+ conn.close()
+
+ logger.info(f"清理完成: 用户{user_count}个, Cookie{cookie_count}个, 卡券{card_count}个, 邮箱验证码{email_count}个, 图形验证码{captcha_count}个")
+
+ except Exception as e:
+ logger.error(f"清理失败: {e}")
+
+def main():
+ """主测试函数"""
+ print("🚀 用户日志显示功能测试")
+ print("=" * 60)
+
+ print("📋 测试目标:")
+ print("• 验证API请求日志显示用户信息")
+ print("• 验证业务操作日志显示用户信息")
+ print("• 验证权限验证日志显示用户信息")
+ print("• 验证管理员操作日志显示")
+
+ try:
+ # 创建测试用户
+ user = create_test_user()
+
+ if not user:
+ print("❌ 测试失败:无法创建测试用户")
+ return False
+
+ print(f"✅ 成功创建测试用户: {user['username']}")
+
+ # 测试用户操作
+ test_user_operations(user)
+
+ # 测试管理员操作
+ test_admin_operations()
+
+ # 清理测试数据
+ cleanup_test_data(user)
+
+ print("\n" + "=" * 60)
+ print("🎉 用户日志显示功能测试完成!")
+
+ print("\n📋 检查要点:")
+ print("✅ 1. API请求日志应显示: 【用户名#用户ID】")
+ print("✅ 2. 业务操作日志应显示用户信息")
+ print("✅ 3. 权限验证日志应显示操作用户")
+ print("✅ 4. 管理员操作应显示: 【admin#1】")
+
+ print("\n💡 日志格式示例:")
+ print("🌐 【logtest_user#2】 API请求: POST /cookies")
+ print("✅ 【logtest_user#2】 API响应: POST /cookies - 200 (0.005s)")
+ print("📝 【logtest_user#2】 尝试添加Cookie: logtest_cookie")
+ print("✅ 【logtest_user#2】 Cookie添加成功: logtest_cookie")
+
+ return True
+
+ except Exception as e:
+ print(f"💥 测试异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+if __name__ == "__main__":
+ main()