xianyu-auto-reply/utils/qr_login.py
2025-08-02 11:29:42 +08:00

388 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
闲鱼扫码登录工具
基于API接口实现二维码生成和Cookie获取参照myfish-main项目
"""
import asyncio
import time
import uuid
import json
import re
from random import random
from typing import Optional, Dict, Any
import httpx
import qrcode
import qrcode.constants
from loguru import logger
def generate_headers():
"""生成请求头"""
return {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
class GetLoginParamsError(Exception):
"""获取登录参数错误"""
class GetLoginQRCodeError(Exception):
"""获取登录二维码失败"""
class NotLoginError(Exception):
"""未登录错误"""
class QRLoginSession:
"""二维码登录会话"""
def __init__(self, session_id: str):
self.session_id = session_id
self.status = 'waiting' # waiting, scanned, success, expired, cancelled
self.qr_code_url = None
self.qr_content = None
self.cookies = {}
self.unb = None
self.created_time = time.time()
self.expire_time = 300 # 5分钟过期
self.params = {} # 存储登录参数
def is_expired(self) -> bool:
"""检查是否过期"""
return time.time() - self.created_time > self.expire_time
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'session_id': self.session_id,
'status': self.status,
'qr_code_url': self.qr_code_url,
'created_time': self.created_time,
'is_expired': self.is_expired()
}
class QRLoginManager:
"""二维码登录管理器"""
def __init__(self):
self.sessions: Dict[str, QRLoginSession] = {}
self.headers = generate_headers()
self.host = "https://passport.goofish.com"
self.api_mini_login = f"{self.host}/mini_login.htm"
self.api_generate_qr = f"{self.host}/newlogin/qrcode/generate.do"
self.api_scan_status = f"{self.host}/newlogin/qrcode/query.do"
self.api_h5_tk = "https://h5api.m.goofish.com/h5/mtop.gaia.nodejs.gaia.idle.data.gw.v2.index.get/1.0/"
def _cookie_marshal(self, cookies: dict) -> str:
"""将Cookie字典转换为字符串"""
return "; ".join([f"{k}={v}" for k, v in cookies.items()])
async def _get_mh5tk(self, session: QRLoginSession) -> dict:
"""获取m_h5_tk和m_h5_tk_enc"""
params = {
"jsv": "2.7.2",
"appKey": "34839810",
"t": int(time.time()),
"sign": "",
"v": "1.0",
"type": "originaljson",
"accountSite": "xianyu",
"dataType": "json",
"timeout": 20000,
"api": "mtop.gaia.nodejs.gaia.idle.data.gw.v2.index.get",
"sessionOption": "AutoLoginOnly",
"spm_cnt": "a21ybx.home.0.0",
}
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.post(
self.api_h5_tk, params=params, headers=self.headers
)
cookies = {}
for k, v in resp.cookies.items():
cookies[k] = v
session.cookies[k] = v
return cookies
async def _get_login_params(self, session: QRLoginSession) -> dict:
"""获取二维码登录时需要的表单参数"""
params = {
"lang": "zh_cn",
"appName": "xianyu",
"appEntrance": "web",
"styleType": "vertical",
"bizParams": "",
"notLoadSsoView": False,
"notKeepLogin": False,
"isMobile": False,
"qrCodeFirst": False,
"stie": 77,
"rnd": random(),
}
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.get(
self.api_mini_login,
params=params,
cookies=session.cookies,
headers=self.headers,
)
# 正则匹配需要的json数据
pattern = r"window\.viewData\s*=\s*(\{.*?\});"
match = re.search(pattern, resp.text)
if match:
json_string = match.group(1)
view_data = json.loads(json_string)
data = view_data.get("loginFormData")
if data:
data["umidTag"] = "SERVER"
session.params.update(data)
return data
else:
raise GetLoginParamsError("未找到loginFormData")
else:
raise GetLoginParamsError("获取登录参数失败")
async def generate_qr_code(self) -> Dict[str, Any]:
"""生成二维码"""
try:
# 创建新的会话
session_id = str(uuid.uuid4())
session = QRLoginSession(session_id)
# 1. 获取m_h5_tk
await self._get_mh5tk(session)
logger.info(f"获取m_h5_tk成功: {session_id}")
# 2. 获取登录参数
login_params = await self._get_login_params(session)
logger.info(f"获取登录参数成功: {session_id}")
# 3. 生成二维码
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.get(
self.api_generate_qr,
params=login_params,
headers=self.headers
)
results = resp.json()
if results.get("content", {}).get("success") == True:
# 更新会话参数
session.params.update({
"t": results["content"]["data"]["t"],
"ck": results["content"]["data"]["ck"],
})
# 获取二维码内容
qr_content = results["content"]["data"]["codeContent"]
session.qr_content = qr_content
# 生成二维码图片base64格式
qr = qrcode.QRCode(
version=5,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=2,
)
qr.add_data(qr_content)
qr.make()
# 将二维码转换为base64
from io import BytesIO
import base64
qr_img = qr.make_image()
buffer = BytesIO()
qr_img.save(buffer, format='PNG')
qr_base64 = base64.b64encode(buffer.getvalue()).decode()
qr_data_url = f"data:image/png;base64,{qr_base64}"
session.qr_code_url = qr_data_url
session.status = 'waiting'
# 保存会话
self.sessions[session_id] = session
# 启动状态检查任务
asyncio.create_task(self._monitor_qr_status(session_id))
logger.info(f"二维码生成成功: {session_id}")
return {
'success': True,
'session_id': session_id,
'qr_code_url': qr_data_url
}
else:
raise GetLoginQRCodeError("获取登录二维码失败")
except Exception as e:
logger.error(f"生成二维码失败: {e}")
return {'success': False, 'message': f'生成二维码失败: {str(e)}'}
async def _poll_qrcode_status(self, session: QRLoginSession) -> httpx.Response:
"""获取二维码扫描状态"""
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.post(
self.api_scan_status,
data=session.params,
cookies=session.cookies,
headers=self.headers,
)
return resp
async def _monitor_qr_status(self, session_id: str):
"""监控二维码状态"""
try:
session = self.sessions.get(session_id)
if not session:
return
logger.info(f"开始监控二维码状态: {session_id}")
# 监控登录状态
max_wait_time = 300 # 5分钟
start_time = time.time()
while time.time() - start_time < max_wait_time:
try:
# 检查会话是否还存在
if session_id not in self.sessions:
break
# 轮询二维码状态
resp = await self._poll_qrcode_status(session)
qrcode_status = (
resp.json()
.get("content", {})
.get("data", {})
.get("qrCodeStatus")
)
if qrcode_status == "CONFIRMED":
# 登录确认
if (
resp.json()
.get("content", {})
.get("data", {})
.get("iframeRedirect")
is True
):
# 账号被风控,需要手机验证
session.status = 'cancelled'
iframe_url = (
resp.json()
.get("content", {})
.get("data", {})
.get("iframeRedirectUrl")
)
logger.warning(f"账号被风控,需要手机验证: {session_id}, URL: {iframe_url}")
break
else:
# 登录成功
session.status = 'success'
# 保存Cookie
for k, v in resp.cookies.items():
session.cookies[k] = v
if k == 'unb':
session.unb = v
logger.info(f"扫码登录成功: {session_id}, UNB: {session.unb}")
break
elif qrcode_status == "NEW":
# 二维码未被扫描,继续轮询
continue
elif qrcode_status == "EXPIRED":
# 二维码已过期
session.status = 'expired'
logger.info(f"二维码已过期: {session_id}")
break
elif qrcode_status == "SCANED":
# 二维码已被扫描,等待确认
if session.status == 'waiting':
session.status = 'scanned'
logger.info(f"二维码已扫描,等待确认: {session_id}")
else:
# 用户取消确认
session.status = 'cancelled'
logger.info(f"用户取消登录: {session_id}")
break
await asyncio.sleep(0.8) # 每0.8秒检查一次
except Exception as e:
logger.error(f"监控二维码状态异常: {e}")
await asyncio.sleep(2)
# 超时处理
if session.status not in ['success', 'expired', 'cancelled']:
session.status = 'expired'
logger.info(f"二维码监控超时,标记为过期: {session_id}")
except Exception as e:
logger.error(f"监控二维码状态失败: {e}")
if session_id in self.sessions:
self.sessions[session_id].status = 'expired'
def get_session_status(self, session_id: str) -> Dict[str, Any]:
"""获取会话状态"""
session = self.sessions.get(session_id)
if not session:
return {'status': 'not_found'}
if session.is_expired() and session.status != 'success':
session.status = 'expired'
result = {
'status': session.status,
'session_id': session_id
}
# 如果登录成功返回Cookie信息
if session.status == 'success' and session.cookies and session.unb:
result['cookies'] = self._cookie_marshal(session.cookies)
result['unb'] = session.unb
return result
def cleanup_expired_sessions(self):
"""清理过期会话"""
expired_sessions = []
for session_id, session in self.sessions.items():
if session.is_expired():
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
logger.info(f"清理过期会话: {session_id}")
def get_session_cookies(self, session_id: str) -> Optional[Dict[str, str]]:
"""获取会话Cookie"""
session = self.sessions.get(session_id)
if session and session.status == 'success':
return {
'cookies': self._cookie_marshal(session.cookies),
'unb': session.unb
}
return None
# 全局二维码登录管理器实例
qr_login_manager = QRLoginManager()