add 自动更新Cookie
This commit is contained in:
223
app/helper/cookie.py
Normal file
223
app/helper/cookie.py
Normal file
@ -0,0 +1,223 @@
|
||||
import base64
|
||||
from typing import Tuple, Optional
|
||||
|
||||
from lxml import etree
|
||||
from playwright.sync_api import Page
|
||||
|
||||
from app.helper.browser import PlaywrightHelper
|
||||
from app.helper.ocr import OcrHelper
|
||||
from app.log import logger
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.site import SiteUtils
|
||||
from app.utils.string import StringUtils
|
||||
|
||||
|
||||
class CookieHelper:
|
||||
# 站点登录界面元素XPATH
|
||||
_SITE_LOGIN_XPATH = {
|
||||
"username": [
|
||||
'//input[@name="username"]',
|
||||
'//input[@id="form_item_username"]',
|
||||
'//input[@id="username"]'
|
||||
],
|
||||
"password": [
|
||||
'//input[@name="password"]',
|
||||
'//input[@id="form_item_password"]',
|
||||
'//input[@id="password"]'
|
||||
],
|
||||
"captcha": [
|
||||
'//input[@name="imagestring"]',
|
||||
'//input[@name="captcha"]',
|
||||
'//input[@id="form_item_captcha"]'
|
||||
],
|
||||
"captcha_img": [
|
||||
'//img[@alt="CAPTCHA"]/@src',
|
||||
'//img[@alt="SECURITY CODE"]/@src',
|
||||
'//img[@id="LAY-user-get-vercode"]/@src',
|
||||
'//img[contains(@src,"/api/getCaptcha")]/@src'
|
||||
],
|
||||
"submit": [
|
||||
'//input[@type="submit"]',
|
||||
'//button[@type="submit"]',
|
||||
'//button[@lay-filter="login"]',
|
||||
'//button[@lay-filter="formLogin"]',
|
||||
'//input[@type="button"][@value="登录"]'
|
||||
],
|
||||
"error": [
|
||||
"//table[@class='main']//td[@class='text']/text()"
|
||||
],
|
||||
"twostep": [
|
||||
'//input[@name="two_step_code"]',
|
||||
'//input[@name="2fa_secret"]'
|
||||
]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def parse_cookies(cookies: list) -> str:
|
||||
"""
|
||||
将浏览器返回的cookies转化为字符串
|
||||
"""
|
||||
if not cookies:
|
||||
return ""
|
||||
cookie_str = ""
|
||||
for cookie in cookies:
|
||||
cookie_str += f"{cookie['name']}={cookie['value']}; "
|
||||
return cookie_str
|
||||
|
||||
def get_site_cookie_ua(self,
|
||||
url: str,
|
||||
username: str,
|
||||
password: str,
|
||||
proxies: dict = None) -> Tuple[Optional[str], Optional[str], str]:
|
||||
"""
|
||||
获取站点cookie和ua
|
||||
:param url: 站点地址
|
||||
:param username: 用户名
|
||||
:param password: 密码
|
||||
:param proxies: 代理
|
||||
:return: cookie、ua、message
|
||||
"""
|
||||
|
||||
def __page_handler(page: Page) -> Tuple[Optional[str], Optional[str], str]:
|
||||
"""
|
||||
页面处理
|
||||
:return: Cookie和UA
|
||||
"""
|
||||
# 登录页面代码
|
||||
html_text = page.content()
|
||||
if not html_text:
|
||||
return None, None, "获取源码失败"
|
||||
# 查找用户名输入框
|
||||
html = etree.HTML(html_text)
|
||||
username_xpath = None
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("username"):
|
||||
if html.xpath(xpath):
|
||||
username_xpath = xpath
|
||||
break
|
||||
if not username_xpath:
|
||||
return None, None, "未找到用户名输入框"
|
||||
# 查找密码输入框
|
||||
password_xpath = None
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("password"):
|
||||
if html.xpath(xpath):
|
||||
password_xpath = xpath
|
||||
break
|
||||
if not password_xpath:
|
||||
return None, None, "未找到密码输入框"
|
||||
# 查找验证码输入框
|
||||
captcha_xpath = None
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("captcha"):
|
||||
if html.xpath(xpath):
|
||||
captcha_xpath = xpath
|
||||
break
|
||||
# 查找验证码图片
|
||||
captcha_img_url = None
|
||||
if captcha_xpath:
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("captcha_img"):
|
||||
if html.xpath(xpath):
|
||||
captcha_img_url = html.xpath(xpath)[0]
|
||||
break
|
||||
if not captcha_img_url:
|
||||
return None, None, "未找到验证码图片"
|
||||
# 查找登录按钮
|
||||
submit_xpath = None
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("submit"):
|
||||
if html.xpath(xpath):
|
||||
submit_xpath = xpath
|
||||
break
|
||||
if not submit_xpath:
|
||||
return None, None, "未找到登录按钮"
|
||||
# 点击登录按钮
|
||||
try:
|
||||
# 等待登录按钮准备好
|
||||
page.wait_for_selector(submit_xpath)
|
||||
# 输入用户名
|
||||
page.fill(username_xpath, username)
|
||||
# 输入密码
|
||||
page.fill(password_xpath, password)
|
||||
# 识别验证码
|
||||
if captcha_xpath and captcha_img_url:
|
||||
captcha_element = page.query_selector(captcha_xpath)
|
||||
if captcha_element.is_visible():
|
||||
# 验证码图片地址
|
||||
code_url = self.__get_captcha_url(url, captcha_img_url)
|
||||
# 获取当前的cookie和ua
|
||||
cookie = self.parse_cookies(page.context.cookies())
|
||||
ua = page.evaluate("() => window.navigator.userAgent")
|
||||
# 自动OCR识别验证码
|
||||
captcha = self.get_captcha_text(cookie=cookie, ua=ua, code_url=code_url)
|
||||
if captcha:
|
||||
logger.info("验证码地址为:%s,识别结果:%s" % (code_url, captcha))
|
||||
else:
|
||||
return None, None, "验证码识别失败"
|
||||
# 输入验证码
|
||||
captcha_element.fill(captcha)
|
||||
else:
|
||||
# 不可见元素不处理
|
||||
pass
|
||||
# 点击登录按钮
|
||||
page.click(submit_xpath)
|
||||
page.wait_for_load_state("networkidle", timeout=30 * 1000)
|
||||
except Exception as e:
|
||||
logger.error(f"仿真登录失败:{e}")
|
||||
return None, None, f"仿真登录失败:{e}"
|
||||
# 登录后的源码
|
||||
html_text = page.content()
|
||||
if not html_text:
|
||||
return None, None, "获取网页源码失败"
|
||||
if SiteUtils.is_logged_in(html_text):
|
||||
return self.parse_cookies(page.context.cookies()), \
|
||||
page.evaluate("() => window.navigator.userAgent"), ""
|
||||
else:
|
||||
# 读取错误信息
|
||||
error_xpath = None
|
||||
for xpath in self._SITE_LOGIN_XPATH.get("error"):
|
||||
if html.xpath(xpath):
|
||||
error_xpath = xpath
|
||||
break
|
||||
if not error_xpath:
|
||||
return None, None, "登录失败"
|
||||
else:
|
||||
error_msg = html.xpath(error_xpath)[0]
|
||||
return None, None, error_msg
|
||||
|
||||
if not url or not username or not password:
|
||||
return None, None, "参数错误"
|
||||
|
||||
return PlaywrightHelper().action(url=url,
|
||||
callback=__page_handler,
|
||||
proxies=proxies)
|
||||
|
||||
@staticmethod
|
||||
def get_captcha_base64(cookie: str, ua: str, image_url: str) -> str:
|
||||
"""
|
||||
根据图片地址,使用浏览器获取验证码图片base64编码
|
||||
"""
|
||||
if not image_url:
|
||||
return ""
|
||||
ret = RequestUtils(ua=ua, cookies=cookie).get_res(image_url)
|
||||
if ret:
|
||||
return base64.b64encode(ret.content).decode()
|
||||
return ""
|
||||
|
||||
def get_captcha_text(self, cookie: str, ua: str, code_url: str) -> str:
|
||||
"""
|
||||
识别验证码图片的内容
|
||||
"""
|
||||
code_b64 = self.get_captcha_base64(cookie=cookie,
|
||||
ua=ua,
|
||||
image_url=code_url)
|
||||
if not code_b64:
|
||||
return ""
|
||||
return OcrHelper().get_captcha_text(image_b64=code_b64)
|
||||
|
||||
@staticmethod
|
||||
def __get_captcha_url(siteurl: str, imageurl: str) -> str:
|
||||
"""
|
||||
获取验证码图片的URL
|
||||
"""
|
||||
if not siteurl or not imageurl:
|
||||
return ""
|
||||
if imageurl.startswith("/"):
|
||||
imageurl = imageurl[1:]
|
||||
return "%s/%s" % (StringUtils.get_base_url(siteurl), imageurl)
|
Reference in New Issue
Block a user