Merge pull request #1478 from WangEdward/main
This commit is contained in:
@ -134,6 +134,7 @@ def update_cookie(
|
||||
site_id: int,
|
||||
username: str,
|
||||
password: str,
|
||||
code: str = None,
|
||||
db: Session = Depends(get_db),
|
||||
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
|
||||
"""
|
||||
@ -149,7 +150,8 @@ def update_cookie(
|
||||
# 更新Cookie
|
||||
state, message = SiteChain().update_cookie(site_info=site_info,
|
||||
username=username,
|
||||
password=password)
|
||||
password=password,
|
||||
two_step_code=code)
|
||||
return schemas.Response(success=state, message=message)
|
||||
|
||||
|
||||
|
@ -350,7 +350,7 @@ class SiteChain(ChainBase):
|
||||
title = f"共有 {len(site_list)} 个站点,回复对应指令操作:" \
|
||||
f"\n- 禁用站点:/site_disable [id]" \
|
||||
f"\n- 启用站点:/site_enable [id]" \
|
||||
f"\n- 更新站点Cookie:/site_cookie [id] [username] [password]"
|
||||
f"\n- 更新站点Cookie:/site_cookie [id] [username] [password] [2fa_code/secret]"
|
||||
messages = []
|
||||
for site in site_list:
|
||||
if site.render:
|
||||
@ -416,12 +416,13 @@ class SiteChain(ChainBase):
|
||||
self.remote_list(channel, userid)
|
||||
|
||||
def update_cookie(self, site_info: Site,
|
||||
username: str, password: str) -> Tuple[bool, str]:
|
||||
username: str, password: str, two_step_code: str = None) -> Tuple[bool, str]:
|
||||
"""
|
||||
根据用户名密码更新站点Cookie
|
||||
:param site_info: 站点信息
|
||||
:param username: 用户名
|
||||
:param password: 密码
|
||||
:param two_step_code: 二步验证码或密钥
|
||||
:return: (是否成功, 错误信息)
|
||||
"""
|
||||
# 更新站点Cookie
|
||||
@ -429,6 +430,7 @@ class SiteChain(ChainBase):
|
||||
url=site_info.url,
|
||||
username=username,
|
||||
password=password,
|
||||
two_step_code=two_step_code,
|
||||
proxies=settings.PROXY_HOST if site_info.proxy else None
|
||||
)
|
||||
if result:
|
||||
@ -446,8 +448,8 @@ class SiteChain(ChainBase):
|
||||
"""
|
||||
使用用户名密码更新站点Cookie
|
||||
"""
|
||||
err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password]," \
|
||||
"[id]为站点编号,[uername]为站点用户名,[password]为站点密码"
|
||||
err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password] [2fa_code/secret]," \
|
||||
"[id]为站点编号,[uername]为站点用户名,[password]为站点密码,[2fa_code/secret]为站点二步验证码或密钥"
|
||||
if not arg_str:
|
||||
self.post_message(Notification(
|
||||
channel=channel,
|
||||
@ -455,7 +457,11 @@ class SiteChain(ChainBase):
|
||||
return
|
||||
arg_str = str(arg_str).strip()
|
||||
args = arg_str.split()
|
||||
if len(args) != 3:
|
||||
# 二步验证码
|
||||
two_step_code = None
|
||||
if len(args) == 4:
|
||||
two_step_code = args[3]
|
||||
elif len(args) != 3:
|
||||
self.post_message(Notification(
|
||||
channel=channel,
|
||||
title=err_title, userid=userid))
|
||||
@ -485,7 +491,8 @@ class SiteChain(ChainBase):
|
||||
# 更新Cookie
|
||||
status, msg = self.update_cookie(site_info=site_info,
|
||||
username=username,
|
||||
password=password)
|
||||
password=password,
|
||||
two_step_code=two_step_code)
|
||||
if not status:
|
||||
logger.error(msg)
|
||||
self.post_message(Notification(
|
||||
|
@ -6,6 +6,7 @@ from playwright.sync_api import Page
|
||||
|
||||
from app.helper.browser import PlaywrightHelper
|
||||
from app.helper.ocr import OcrHelper
|
||||
from app.helper.twofa import TwoFactorAuth
|
||||
from app.log import logger
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.site import SiteUtils
|
||||
@ -71,12 +72,14 @@ class CookieHelper:
|
||||
url: str,
|
||||
username: str,
|
||||
password: str,
|
||||
two_step_code: str = None,
|
||||
proxies: dict = None) -> Tuple[Optional[str], Optional[str], str]:
|
||||
"""
|
||||
获取站点cookie和ua
|
||||
:param url: 站点地址
|
||||
:param username: 用户名
|
||||
:param password: 密码
|
||||
:param two_step_code: 二步验证码或密钥
|
||||
:param proxies: 代理
|
||||
:return: cookie、ua、message
|
||||
"""
|
||||
@ -107,6 +110,15 @@ class CookieHelper:
|
||||
break
|
||||
if not password_xpath:
|
||||
return None, None, "未找到密码输入框"
|
||||
# 处理二步验证码
|
||||
two_step_code = TwoFactorAuth(two_step_code).get_code()
|
||||
# 查找二步验证码输入框
|
||||
twostep_xpath = None
|
||||
if two_step_code:
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("twostep"):
|
||||
if html.xpath(xpath):
|
||||
twostep_xpath = xpath
|
||||
break
|
||||
# 查找验证码输入框
|
||||
captcha_xpath = None
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("captcha"):
|
||||
@ -138,6 +150,9 @@ class CookieHelper:
|
||||
page.fill(username_xpath, username)
|
||||
# 输入密码
|
||||
page.fill(password_xpath, password)
|
||||
# 输入二步验证码
|
||||
if twostep_xpath:
|
||||
page.fill(twostep_xpath, two_step_code)
|
||||
# 识别验证码
|
||||
if captcha_xpath and captcha_img_url:
|
||||
captcha_element = page.query_selector(captcha_xpath)
|
||||
|
39
app/helper/twofa.py
Normal file
39
app/helper/twofa.py
Normal file
@ -0,0 +1,39 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
from app.log import logger
|
||||
|
||||
|
||||
class TwoFactorAuth:
|
||||
def __init__(self, code_or_secret: str):
|
||||
if code_or_secret and len(code_or_secret) > 16:
|
||||
self.code = None
|
||||
self.secret = code_or_secret
|
||||
else:
|
||||
self.code = code_or_secret
|
||||
self.secret = None
|
||||
|
||||
def __calc(self, secret_key: str) -> str:
|
||||
try:
|
||||
input_time = int(time.time()) // 30
|
||||
key = base64.b32decode(secret_key)
|
||||
msg = struct.pack(">Q", input_time)
|
||||
google_code = hmac.new(key, msg, hashlib.sha1).digest()
|
||||
o = (
|
||||
google_code[19] & 15
|
||||
if sys.version_info > (2, 7)
|
||||
else ord(str(google_code[19])) & 15
|
||||
)
|
||||
google_code = str(
|
||||
(struct.unpack(">I", google_code[o : o + 4])[0] & 0x7FFFFFFF) % 1000000
|
||||
)
|
||||
return f"0{google_code}" if len(google_code) == 5 else google_code
|
||||
except Exception as e:
|
||||
logger.error(f"计算动态验证码失败:{str(e)}")
|
||||
return None
|
||||
|
||||
def get_code(self) -> str:
|
||||
return self.code or self.__calc(self.secret)
|
Reference in New Issue
Block a user