xianyu-auto-reply/utils/qr_login.py
Hou Yuxi 39782cdfdb
Update qr_login.py
针对网络慢以及代理情况下获取二维码异常修复
2025-08-07 23:18:43 +08:00

452 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
闲鱼扫码登录工具
基于API接口实现二维码生成和Cookie获取参照myfish-main项目
"""
import asyncio
import time
import uuid
import json
import re
from random import random
from typing import Optional, Dict, Any
import httpx
import qrcode
import qrcode.constants
from loguru import logger
import hashlib
def generate_headers():
"""生成请求头"""
return {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Referer': 'https://passport.goofish.com/',
'Origin': 'https://passport.goofish.com',
}
class GetLoginParamsError(Exception):
"""获取登录参数错误"""
class GetLoginQRCodeError(Exception):
"""获取登录二维码失败"""
class NotLoginError(Exception):
"""未登录错误"""
class QRLoginSession:
"""二维码登录会话"""
def __init__(self, session_id: str):
self.session_id = session_id
self.status = 'waiting' # waiting, scanned, success, expired, cancelled, verification_required
self.qr_code_url = None
self.qr_content = None
self.cookies = {}
self.unb = None
self.created_time = time.time()
self.expire_time = 300 # 5分钟过期
self.params = {} # 存储登录参数
self.verification_url = None # 风控验证URL
def is_expired(self) -> bool:
"""检查是否过期"""
return time.time() - self.created_time > self.expire_time
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'session_id': self.session_id,
'status': self.status,
'qr_code_url': self.qr_code_url,
'created_time': self.created_time,
'is_expired': self.is_expired()
}
class QRLoginManager:
"""二维码登录管理器"""
def __init__(self):
self.sessions: Dict[str, QRLoginSession] = {}
self.headers = generate_headers()
self.host = "https://passport.goofish.com"
self.api_mini_login = f"{self.host}/mini_login.htm"
self.api_generate_qr = f"{self.host}/newlogin/qrcode/generate.do"
self.api_scan_status = f"{self.host}/newlogin/qrcode/query.do"
self.api_h5_tk = "https://h5api.m.goofish.com/h5/mtop.gaia.nodejs.gaia.idle.data.gw.v2.index.get/1.0/"
# 配置代理(如果需要的话,取消注释并修改代理地址)
# self.proxy = "http://127.0.0.1:7890"
self.proxy = None
# 配置超时时间
self.timeout = httpx.Timeout(connect=30.0, read=60.0, write=30.0, pool=60.0)
def _cookie_marshal(self, cookies: dict) -> str:
"""将Cookie字典转换为字符串"""
return "; ".join([f"{k}={v}" for k, v in cookies.items()])
async def _get_mh5tk(self, session: QRLoginSession) -> dict:
"""获取m_h5_tk和m_h5_tk_enc"""
data = {"bizScene": "home"}
data_str = json.dumps(data, separators=(',', ':'))
t = str(int(time.time() * 1000))
app_key = "34839810"
# 先发一次 GET 请求,获取 cookie 中的 m_h5_tk
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True, proxy=self.proxy) as client:
try:
resp = await client.get(self.api_h5_tk, headers=self.headers)
cookies = {k: v for k, v in resp.cookies.items()}
session.cookies.update(cookies)
m_h5_tk = cookies.get("m_h5_tk", "")
token = m_h5_tk.split("_")[0] if "_" in m_h5_tk else ""
# 生成签名
sign_input = f"{token}&{t}&{app_key}&{data_str}"
sign = hashlib.md5(sign_input.encode()).hexdigest()
# 构造最终请求参数
params = {
"jsv": "2.7.2",
"appKey": app_key,
"t": t,
"sign": sign,
"v": "1.0",
"type": "originaljson",
"dataType": "json",
"timeout": 20000,
"api": "mtop.gaia.nodejs.gaia.idle.data.gw.v2.index.get",
"data": data_str,
}
# 发请求正式获取数据,确保 token 有效
await client.post(self.api_h5_tk, params=params, headers=self.headers, cookies=session.cookies)
return cookies
except httpx.ConnectTimeout:
logger.error("获取m_h5_tk时连接超时")
raise
except httpx.ReadTimeout:
logger.error("获取m_h5_tk时读取超时")
raise
except httpx.ConnectError:
logger.error("获取m_h5_tk时连接错误")
raise
async def _get_login_params(self, session: QRLoginSession) -> dict:
"""获取二维码登录时需要的表单参数"""
params = {
"lang": "zh_cn",
"appName": "xianyu",
"appEntrance": "web",
"styleType": "vertical",
"bizParams": "",
"notLoadSsoView": False,
"notKeepLogin": False,
"isMobile": False,
"qrCodeFirst": False,
"stie": 77,
"rnd": random(),
}
async with httpx.AsyncClient(follow_redirects=True, timeout=self.timeout, proxy=self.proxy) as client:
try:
resp = await client.get(
self.api_mini_login,
params=params,
cookies=session.cookies,
headers=self.headers,
)
# 正则匹配需要的json数据
pattern = r"window\.viewData\s*=\s*(\{.*?\});"
match = re.search(pattern, resp.text)
if match:
json_string = match.group(1)
view_data = json.loads(json_string)
data = view_data.get("loginFormData")
if data:
data["umidTag"] = "SERVER"
session.params.update(data)
return data
else:
raise GetLoginParamsError("未找到loginFormData")
else:
raise GetLoginParamsError("获取登录参数失败")
except httpx.ConnectTimeout:
logger.error("获取登录参数时连接超时")
raise
except httpx.ReadTimeout:
logger.error("获取登录参数时读取超时")
raise
except httpx.ConnectError:
logger.error("获取登录参数时连接错误")
raise
async def generate_qr_code(self) -> Dict[str, Any]:
"""生成二维码"""
try:
# 创建新的会话
session_id = str(uuid.uuid4())
session = QRLoginSession(session_id)
# 1. 获取m_h5_tk
await self._get_mh5tk(session)
logger.info(f"获取m_h5_tk成功: {session_id}")
# 2. 获取登录参数
login_params = await self._get_login_params(session)
logger.info(f"获取登录参数成功: {session_id}")
# 3. 生成二维码
async with httpx.AsyncClient(follow_redirects=True, timeout=self.timeout, proxy=self.proxy) as client:
resp = await client.get(
self.api_generate_qr,
params=login_params,
headers=self.headers
)
logger.debug(f"[调试] 获取二维码接口原始响应: {resp.text}")
try:
results = resp.json()
logger.debug(f"[调试] 获取二维码接口解析后: {json.dumps(results, ensure_ascii=False)}")
except Exception as e:
logger.exception("二维码接口返回不是JSON")
raise GetLoginQRCodeError(f"二维码接口返回异常: {resp.text}")
if results.get("content", {}).get("success") == True:
# 更新会话参数
session.params.update({
"t": results["content"]["data"]["t"],
"ck": results["content"]["data"]["ck"],
})
# 获取二维码内容
qr_content = results["content"]["data"]["codeContent"]
session.qr_content = qr_content
# 生成二维码图片base64格式
qr = qrcode.QRCode(
version=5,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=2,
)
qr.add_data(qr_content)
qr.make()
# 将二维码转换为base64
from io import BytesIO
import base64
qr_img = qr.make_image()
buffer = BytesIO()
qr_img.save(buffer, format='PNG')
qr_base64 = base64.b64encode(buffer.getvalue()).decode()
qr_data_url = f"data:image/png;base64,{qr_base64}"
session.qr_code_url = qr_data_url
session.status = 'waiting'
# 保存会话
self.sessions[session_id] = session
# 启动状态检查任务
asyncio.create_task(self._monitor_qr_status(session_id))
logger.info(f"二维码生成成功: {session_id}")
return {
'success': True,
'session_id': session_id,
'qr_code_url': qr_data_url
}
else:
raise GetLoginQRCodeError("获取登录二维码失败")
except httpx.ConnectTimeout as e:
logger.error(f"连接超时: {e}")
return {'success': False, 'message': f'连接超时,请检查网络或尝试使用代理'}
except httpx.ReadTimeout as e:
logger.error(f"读取超时: {e}")
return {'success': False, 'message': f'读取超时,服务器响应过慢'}
except httpx.ConnectError as e:
logger.error(f"连接错误: {e}")
return {'success': False, 'message': f'连接错误,请检查网络或代理设置'}
except Exception as e:
logger.exception("二维码生成过程中发生异常")
return {'success': False, 'message': f'生成二维码失败: {str(e)}'}
async def _poll_qrcode_status(self, session: QRLoginSession) -> httpx.Response:
"""获取二维码扫描状态"""
async with httpx.AsyncClient(follow_redirects=True, timeout=self.timeout, proxy=self.proxy) as client:
resp = await client.post(
self.api_scan_status,
data=session.params,
cookies=session.cookies,
headers=self.headers,
)
return resp
async def _monitor_qr_status(self, session_id: str):
"""监控二维码状态"""
try:
session = self.sessions.get(session_id)
if not session:
return
logger.info(f"开始监控二维码状态: {session_id}")
# 监控登录状态
max_wait_time = 300 # 5分钟
start_time = time.time()
while time.time() - start_time < max_wait_time:
try:
# 检查会话是否还存在
if session_id not in self.sessions:
break
# 轮询二维码状态
resp = await self._poll_qrcode_status(session)
qrcode_status = (
resp.json()
.get("content", {})
.get("data", {})
.get("qrCodeStatus")
)
if qrcode_status == "CONFIRMED":
# 登录确认
if (
resp.json()
.get("content", {})
.get("data", {})
.get("iframeRedirect")
is True
):
# 账号被风控,需要手机验证
session.status = 'verification_required'
iframe_url = (
resp.json()
.get("content", {})
.get("data", {})
.get("iframeRedirectUrl")
)
session.verification_url = iframe_url
logger.warning(f"账号被风控,需要手机验证: {session_id}, URL: {iframe_url}")
break
else:
# 登录成功
session.status = 'success'
# 保存Cookie
for k, v in resp.cookies.items():
session.cookies[k] = v
if k == 'unb':
session.unb = v
logger.info(f"扫码登录成功: {session_id}, UNB: {session.unb}")
break
elif qrcode_status == "NEW":
# 二维码未被扫描,继续轮询
continue
elif qrcode_status == "EXPIRED":
# 二维码已过期
session.status = 'expired'
logger.info(f"二维码已过期: {session_id}")
break
elif qrcode_status == "SCANED":
# 二维码已被扫描,等待确认
if session.status == 'waiting':
session.status = 'scanned'
logger.info(f"二维码已扫描,等待确认: {session_id}")
else:
# 用户取消确认
session.status = 'cancelled'
logger.info(f"用户取消登录: {session_id}")
break
await asyncio.sleep(0.8) # 每0.8秒检查一次
except Exception as e:
logger.error(f"监控二维码状态异常: {e}")
await asyncio.sleep(2)
# 超时处理
if session.status not in ['success', 'expired', 'cancelled', 'verification_required']:
session.status = 'expired'
logger.info(f"二维码监控超时,标记为过期: {session_id}")
except Exception as e:
logger.error(f"监控二维码状态失败: {e}")
if session_id in self.sessions:
self.sessions[session_id].status = 'expired'
def get_session_status(self, session_id: str) -> Dict[str, Any]:
"""获取会话状态"""
session = self.sessions.get(session_id)
if not session:
return {'status': 'not_found'}
if session.is_expired() and session.status != 'success':
session.status = 'expired'
result = {
'status': session.status,
'session_id': session_id
}
# 如果需要验证返回验证URL
if session.status == 'verification_required' and session.verification_url:
result['verification_url'] = session.verification_url
result['message'] = '账号被风控,需要手机验证'
# 如果登录成功返回Cookie信息
if session.status == 'success' and session.cookies and session.unb:
result['cookies'] = self._cookie_marshal(session.cookies)
result['unb'] = session.unb
return result
def cleanup_expired_sessions(self):
"""清理过期会话"""
expired_sessions = []
for session_id, session in self.sessions.items():
if session.is_expired():
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
logger.info(f"清理过期会话: {session_id}")
def get_session_cookies(self, session_id: str) -> Optional[Dict[str, str]]:
"""获取会话Cookie"""
session = self.sessions.get(session_id)
if session and session.status == 'success':
return {
'cookies': self._cookie_marshal(session.cookies),
'unb': session.unb
}
return None
# 全局二维码登录管理器实例
qr_login_manager = QRLoginManager()