diff --git a/app/api/endpoints/site.py b/app/api/endpoints/site.py index 93a2d853..e5747c7b 100644 --- a/app/api/endpoints/site.py +++ b/app/api/endpoints/site.py @@ -134,6 +134,7 @@ def update_cookie( site_id: int, username: str, password: str, + code: str = None, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ @@ -149,7 +150,8 @@ def update_cookie( # 更新Cookie state, message = SiteChain().update_cookie(site_info=site_info, username=username, - password=password) + password=password, + two_step_code=code) return schemas.Response(success=state, message=message) diff --git a/app/chain/site.py b/app/chain/site.py index eb0c3d13..5813debb 100644 --- a/app/chain/site.py +++ b/app/chain/site.py @@ -350,7 +350,7 @@ class SiteChain(ChainBase): title = f"共有 {len(site_list)} 个站点,回复对应指令操作:" \ f"\n- 禁用站点:/site_disable [id]" \ f"\n- 启用站点:/site_enable [id]" \ - f"\n- 更新站点Cookie:/site_cookie [id] [username] [password]" + f"\n- 更新站点Cookie:/site_cookie [id] [username] [password] [2fa_code/secret]" messages = [] for site in site_list: if site.render: @@ -416,12 +416,13 @@ class SiteChain(ChainBase): self.remote_list(channel, userid) def update_cookie(self, site_info: Site, - username: str, password: str) -> Tuple[bool, str]: + username: str, password: str, two_step_code: str = None) -> Tuple[bool, str]: """ 根据用户名密码更新站点Cookie :param site_info: 站点信息 :param username: 用户名 :param password: 密码 + :param two_step_code: 二步验证码或密钥 :return: (是否成功, 错误信息) """ # 更新站点Cookie @@ -429,6 +430,7 @@ class SiteChain(ChainBase): url=site_info.url, username=username, password=password, + two_step_code=two_step_code, proxies=settings.PROXY_HOST if site_info.proxy else None ) if result: @@ -446,8 +448,8 @@ class SiteChain(ChainBase): """ 使用用户名密码更新站点Cookie """ - err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password]," \ - "[id]为站点编号,[uername]为站点用户名,[password]为站点密码" + err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password] [2fa_code/secret]," \ + "[id]为站点编号,[uername]为站点用户名,[password]为站点密码,[2fa_code/secret]为站点二步验证码或密钥" if not arg_str: self.post_message(Notification( channel=channel, @@ -455,7 +457,11 @@ class SiteChain(ChainBase): return arg_str = str(arg_str).strip() args = arg_str.split() - if len(args) != 3: + # 二步验证码 + two_step_code = None + if len(args) == 4: + two_step_code = args[3] + elif len(args) != 3: self.post_message(Notification( channel=channel, title=err_title, userid=userid)) @@ -485,7 +491,8 @@ class SiteChain(ChainBase): # 更新Cookie status, msg = self.update_cookie(site_info=site_info, username=username, - password=password) + password=password, + two_step_code=two_step_code) if not status: logger.error(msg) self.post_message(Notification( diff --git a/app/helper/cookie.py b/app/helper/cookie.py index 87f7dfba..e5ec88c0 100644 --- a/app/helper/cookie.py +++ b/app/helper/cookie.py @@ -6,6 +6,7 @@ from playwright.sync_api import Page from app.helper.browser import PlaywrightHelper from app.helper.ocr import OcrHelper +from app.helper.twofa import TwoFactorAuth from app.log import logger from app.utils.http import RequestUtils from app.utils.site import SiteUtils @@ -71,12 +72,14 @@ class CookieHelper: url: str, username: str, password: str, + two_step_code: str = None, proxies: dict = None) -> Tuple[Optional[str], Optional[str], str]: """ 获取站点cookie和ua :param url: 站点地址 :param username: 用户名 :param password: 密码 + :param two_step_code: 二步验证码或密钥 :param proxies: 代理 :return: cookie、ua、message """ @@ -107,6 +110,15 @@ class CookieHelper: break if not password_xpath: return None, None, "未找到密码输入框" + # 处理二步验证码 + two_step_code = TwoFactorAuth(two_step_code).get_code() + # 查找二步验证码输入框 + twostep_xpath = None + if two_step_code: + for xpath in self._SITE_LOGIN_XPATH.get("twostep"): + if html.xpath(xpath): + twostep_xpath = xpath + break # 查找验证码输入框 captcha_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("captcha"): @@ -138,6 +150,9 @@ class CookieHelper: page.fill(username_xpath, username) # 输入密码 page.fill(password_xpath, password) + # 输入二步验证码 + if twostep_xpath: + page.fill(twostep_xpath, two_step_code) # 识别验证码 if captcha_xpath and captcha_img_url: captcha_element = page.query_selector(captcha_xpath) diff --git a/app/helper/twofa.py b/app/helper/twofa.py new file mode 100644 index 00000000..40d4e39b --- /dev/null +++ b/app/helper/twofa.py @@ -0,0 +1,39 @@ +import base64 +import hashlib +import hmac +import struct +import sys +import time +from app.log import logger + + +class TwoFactorAuth: + def __init__(self, code_or_secret: str): + if code_or_secret and len(code_or_secret) > 16: + self.code = None + self.secret = code_or_secret + else: + self.code = code_or_secret + self.secret = None + + def __calc(self, secret_key: str) -> str: + try: + input_time = int(time.time()) // 30 + key = base64.b32decode(secret_key) + msg = struct.pack(">Q", input_time) + google_code = hmac.new(key, msg, hashlib.sha1).digest() + o = ( + google_code[19] & 15 + if sys.version_info > (2, 7) + else ord(str(google_code[19])) & 15 + ) + google_code = str( + (struct.unpack(">I", google_code[o : o + 4])[0] & 0x7FFFFFFF) % 1000000 + ) + return f"0{google_code}" if len(google_code) == 5 else google_code + except Exception as e: + logger.error(f"计算动态验证码失败:{str(e)}") + return None + + def get_code(self) -> str: + return self.code or self.__calc(self.secret)