import base64 from typing import Tuple, Optional from lxml import etree from playwright.sync_api import Page from app.helper.browser import PlaywrightHelper from app.helper.ocr import OcrHelper from app.log import logger from app.utils.http import RequestUtils from app.utils.site import SiteUtils from app.utils.string import StringUtils class CookieHelper: # 站点登录界面元素XPATH _SITE_LOGIN_XPATH = { "username": [ '//input[@name="username"]', '//input[@id="form_item_username"]', '//input[@id="username"]' ], "password": [ '//input[@name="password"]', '//input[@id="form_item_password"]', '//input[@id="password"]', '//input[@type="password"]' ], "captcha": [ '//input[@name="imagestring"]', '//input[@name="captcha"]', '//input[@id="form_item_captcha"]', '//input[@placeholder="驗證碼"]' ], "captcha_img": [ '//img[@alt="captcha"]/@src', '//img[@alt="CAPTCHA"]/@src', '//img[@alt="SECURITY CODE"]/@src', '//img[@id="LAY-user-get-vercode"]/@src', '//img[contains(@src,"/api/getCaptcha")]/@src' ], "submit": [ '//input[@type="submit"]', '//button[@type="submit"]', '//button[@lay-filter="login"]', '//button[@lay-filter="formLogin"]', '//input[@type="button"][@value="登录"]' ], "error": [ "//table[@class='main']//td[@class='text']/text()" ], "twostep": [ '//input[@name="two_step_code"]', '//input[@name="2fa_secret"]' ] } @staticmethod def parse_cookies(cookies: list) -> str: """ 将浏览器返回的cookies转化为字符串 """ if not cookies: return "" cookie_str = "" for cookie in cookies: cookie_str += f"{cookie['name']}={cookie['value']}; " return cookie_str def get_site_cookie_ua(self, url: str, username: str, password: str, proxies: dict = None) -> Tuple[Optional[str], Optional[str], str]: """ 获取站点cookie和ua :param url: 站点地址 :param username: 用户名 :param password: 密码 :param proxies: 代理 :return: cookie、ua、message """ def __page_handler(page: Page) -> Tuple[Optional[str], Optional[str], str]: """ 页面处理 :return: Cookie和UA """ # 登录页面代码 html_text = page.content() if not html_text: return None, None, "获取源码失败" # 查找用户名输入框 html = etree.HTML(html_text) username_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("username"): if html.xpath(xpath): username_xpath = xpath break if not username_xpath: return None, None, "未找到用户名输入框" # 查找密码输入框 password_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("password"): if html.xpath(xpath): password_xpath = xpath break if not password_xpath: return None, None, "未找到密码输入框" # 查找验证码输入框 captcha_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("captcha"): if html.xpath(xpath): captcha_xpath = xpath break # 查找验证码图片 captcha_img_url = None if captcha_xpath: for xpath in self._SITE_LOGIN_XPATH.get("captcha_img"): if html.xpath(xpath): captcha_img_url = html.xpath(xpath)[0] break if not captcha_img_url: return None, None, "未找到验证码图片" # 查找登录按钮 submit_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("submit"): if html.xpath(xpath): submit_xpath = xpath break if not submit_xpath: return None, None, "未找到登录按钮" # 点击登录按钮 try: # 等待登录按钮准备好 page.wait_for_selector(submit_xpath) # 输入用户名 page.fill(username_xpath, username) # 输入密码 page.fill(password_xpath, password) # 识别验证码 if captcha_xpath and captcha_img_url: captcha_element = page.query_selector(captcha_xpath) if captcha_element.is_visible(): # 验证码图片地址 code_url = self.__get_captcha_url(url, captcha_img_url) # 获取当前的cookie和ua cookie = self.parse_cookies(page.context.cookies()) ua = page.evaluate("() => window.navigator.userAgent") # 自动OCR识别验证码 captcha = self.__get_captcha_text(cookie=cookie, ua=ua, code_url=code_url) if captcha: logger.info("验证码地址为:%s,识别结果:%s" % (code_url, captcha)) else: return None, None, "验证码识别失败" # 输入验证码 captcha_element.fill(captcha) else: # 不可见元素不处理 pass # 点击登录按钮 page.click(submit_xpath) page.wait_for_load_state("networkidle", timeout=30 * 1000) except Exception as e: logger.error(f"仿真登录失败:{str(e)}") return None, None, f"仿真登录失败:{str(e)}" # 登录后的源码 html_text = page.content() if not html_text: return None, None, "获取网页源码失败" if SiteUtils.is_logged_in(html_text): return self.parse_cookies(page.context.cookies()), \ page.evaluate("() => window.navigator.userAgent"), "" else: # 读取错误信息 error_xpath = None for xpath in self._SITE_LOGIN_XPATH.get("error"): if html.xpath(xpath): error_xpath = xpath break if not error_xpath: return None, None, "登录失败" else: error_msg = html.xpath(error_xpath)[0] return None, None, error_msg if not url or not username or not password: return None, None, "参数错误" return PlaywrightHelper().action(url=url, callback=__page_handler, proxies=proxies) @staticmethod def __get_captcha_text(cookie: str, ua: str, code_url: str) -> str: """ 识别验证码图片的内容 """ if not code_url: return "" ret = RequestUtils(ua=ua, cookies=cookie).get_res(code_url) if ret: if not ret.content: return "" return OcrHelper().get_captcha_text( image_b64=base64.b64encode(ret.content).decode() ) else: return "" @staticmethod def __get_captcha_url(siteurl: str, imageurl: str) -> str: """ 获取验证码图片的URL """ if not siteurl or not imageurl: return "" if imageurl.startswith("/"): imageurl = imageurl[1:] return "%s/%s" % (StringUtils.get_base_url(siteurl), imageurl)