From 39ad54f3d90e35757eccd003fc81d68f91ff7482 Mon Sep 17 00:00:00 2001 From: Edward <73746306+WangEdward@users.noreply.github.com> Date: Wed, 14 Feb 2024 05:30:41 +0000 Subject: [PATCH 1/4] =?UTF-8?q?feat=20=E6=96=B0=E5=A2=9E=202fa=20helper?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/helper/twofa.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 app/helper/twofa.py diff --git a/app/helper/twofa.py b/app/helper/twofa.py new file mode 100644 index 00000000..4c24dab1 --- /dev/null +++ b/app/helper/twofa.py @@ -0,0 +1,39 @@ +import base64 +import hashlib +import hmac +import struct +import sys +import time +from app.log import logger + + +class TwoFactorAuth: + def __init__(self, code_or_secret: str): + if len(code_or_secret) < 16: + self.code = code_or_secret + self.secret = None + else: + self.code = None + self.secret = code_or_secret + + def __calc(self, secret_key: str) -> str: + try: + input_time = int(time.time()) // 30 + key = base64.b32decode(secret_key) + msg = struct.pack(">Q", input_time) + google_code = hmac.new(key, msg, hashlib.sha1).digest() + o = ( + google_code[19] & 15 + if sys.version_info > (2, 7) + else ord(str(google_code[19])) & 15 + ) + google_code = str( + (struct.unpack(">I", google_code[o : o + 4])[0] & 0x7FFFFFFF) % 1000000 + ) + return f"0{google_code}" if len(google_code) == 5 else google_code + except Exception as e: + logger.error(f"计算动态验证码失败:{str(e)}") + return None + + def get_code(self) -> str: + return self.code or self.__calc(self.secret) From 85fd9b3c09496a112c2b2509a26bf56aab5c060a Mon Sep 17 00:00:00 2001 From: Edward <73746306+WangEdward@users.noreply.github.com> Date: Wed, 14 Feb 2024 08:47:02 +0000 Subject: [PATCH 2/4] =?UTF-8?q?feat=20=E4=B8=BA=20update=5Fcookie=20?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=202fa=20=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/site.py | 4 +++- app/chain/site.py | 19 +++++++++++++------ app/helper/cookie.py | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/app/api/endpoints/site.py b/app/api/endpoints/site.py index 93a2d853..94d490b6 100644 --- a/app/api/endpoints/site.py +++ b/app/api/endpoints/site.py @@ -134,6 +134,7 @@ def update_cookie( site_id: int, username: str, password: str, + two_step_code: str = None, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ @@ -149,7 +150,8 @@ def update_cookie( # 更新Cookie state, message = SiteChain().update_cookie(site_info=site_info, username=username, - password=password) + password=password, + two_step_code=two_step_code) return schemas.Response(success=state, message=message) diff --git a/app/chain/site.py b/app/chain/site.py index eb0c3d13..5813debb 100644 --- a/app/chain/site.py +++ b/app/chain/site.py @@ -350,7 +350,7 @@ class SiteChain(ChainBase): title = f"共有 {len(site_list)} 个站点,回复对应指令操作:" \ f"\n- 禁用站点:/site_disable [id]" \ f"\n- 启用站点:/site_enable [id]" \ - f"\n- 更新站点Cookie:/site_cookie [id] [username] [password]" + f"\n- 更新站点Cookie:/site_cookie [id] [username] [password] [2fa_code/secret]" messages = [] for site in site_list: if site.render: @@ -416,12 +416,13 @@ class SiteChain(ChainBase): self.remote_list(channel, userid) def update_cookie(self, site_info: Site, - username: str, password: str) -> Tuple[bool, str]: + username: str, password: str, two_step_code: str = None) -> Tuple[bool, str]: """ 根据用户名密码更新站点Cookie :param site_info: 站点信息 :param username: 用户名 :param password: 密码 + :param two_step_code: 二步验证码或密钥 :return: (是否成功, 错误信息) """ # 更新站点Cookie @@ -429,6 +430,7 @@ class SiteChain(ChainBase): url=site_info.url, username=username, password=password, + two_step_code=two_step_code, proxies=settings.PROXY_HOST if site_info.proxy else None ) if result: @@ -446,8 +448,8 @@ class SiteChain(ChainBase): """ 使用用户名密码更新站点Cookie """ - err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password]," \ - "[id]为站点编号,[uername]为站点用户名,[password]为站点密码" + err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password] [2fa_code/secret]," \ + "[id]为站点编号,[uername]为站点用户名,[password]为站点密码,[2fa_code/secret]为站点二步验证码或密钥" if not arg_str: self.post_message(Notification( channel=channel, @@ -455,7 +457,11 @@ class SiteChain(ChainBase): return arg_str = str(arg_str).strip() args = arg_str.split() - if len(args) != 3: + # 二步验证码 + two_step_code = None + if len(args) == 4: + two_step_code = args[3] + elif len(args) != 3: self.post_message(Notification( channel=channel, title=err_title, userid=userid)) @@ -485,7 +491,8 @@ class SiteChain(ChainBase): # 更新Cookie status, msg = self.update_cookie(site_info=site_info, username=username, - password=password) + password=password, + two_step_code=two_step_code) if not status: logger.error(msg) self.post_message(Notification( diff --git a/app/helper/cookie.py b/app/helper/cookie.py index 87f7dfba..e5ec88c0 100644 --- a/app/helper/cookie.py +++ b/app/helper/cookie.py @@ -6,6 +6,7 @@ from playwright.sync_api import Page from app.helper.browser import PlaywrightHelper from app.helper.ocr import OcrHelper +from app.helper.twofa import TwoFactorAuth from app.log import logger from app.utils.http import RequestUtils from app.utils.site import SiteUtils @@ -71,12 +72,14 @@ class CookieHelper: url: str, username: str, password: str, + two_step_code: str = None, proxies: dict = None) -> Tuple[Optional[str], Optional[str], str]: """ 获取站点cookie和ua :param url: 站点地址 :param username: 用户名 :param password: 密码 + :param two_step_code: 二步验证码或密钥 :param proxies: 代理 :return: cookie、ua、message """ @@ -107,6 +110,15 @@ class CookieHelper: break if not password_xpath: return None, None, "未找到密码输入框" + # 处理二步验证码 + two_step_code = TwoFactorAuth(two_step_code).get_code() + # 查找二步验证码输入框 + twostep_xpath = None + if two_step_code: + for xpath in self._SITE_LOGIN_XPATH.get("twostep"): + if html.xpath(xpath): + twostep_xpath = xpath + break # 查找验证码输入框 captcha_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("captcha"): @@ -138,6 +150,9 @@ class CookieHelper: page.fill(username_xpath, username) # 输入密码 page.fill(password_xpath, password) + # 输入二步验证码 + if twostep_xpath: + page.fill(twostep_xpath, two_step_code) # 识别验证码 if captcha_xpath and captcha_img_url: captcha_element = page.query_selector(captcha_xpath) From 02981d38c021a85573b30b374eefbc3510c929bd Mon Sep 17 00:00:00 2001 From: Edward <73746306+WangEdward@users.noreply.github.com> Date: Wed, 14 Feb 2024 08:47:55 +0000 Subject: [PATCH 3/4] =?UTF-8?q?chore=20=E9=87=8D=E5=91=BD=E5=90=8D=202fa?= =?UTF-8?q?=20=E5=8F=82=E6=95=B0=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/site.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/api/endpoints/site.py b/app/api/endpoints/site.py index 94d490b6..e5747c7b 100644 --- a/app/api/endpoints/site.py +++ b/app/api/endpoints/site.py @@ -134,7 +134,7 @@ def update_cookie( site_id: int, username: str, password: str, - two_step_code: str = None, + code: str = None, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ @@ -151,7 +151,7 @@ def update_cookie( state, message = SiteChain().update_cookie(site_info=site_info, username=username, password=password, - two_step_code=two_step_code) + two_step_code=code) return schemas.Response(success=state, message=message) From dc43aabe2a40a457d9bec395dcaa63bda4c0c876 Mon Sep 17 00:00:00 2001 From: Edward <73746306+WangEdward@users.noreply.github.com> Date: Wed, 14 Feb 2024 08:51:20 +0000 Subject: [PATCH 4/4] fix 2fa helper --- app/helper/twofa.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/helper/twofa.py b/app/helper/twofa.py index 4c24dab1..40d4e39b 100644 --- a/app/helper/twofa.py +++ b/app/helper/twofa.py @@ -9,12 +9,12 @@ from app.log import logger class TwoFactorAuth: def __init__(self, code_or_secret: str): - if len(code_or_secret) < 16: - self.code = code_or_secret - self.secret = None - else: + if code_or_secret and len(code_or_secret) > 16: self.code = None self.secret = code_or_secret + else: + self.code = code_or_secret + self.secret = None def __calc(self, secret_key: str) -> str: try: