223 lines
8.4 KiB
Python
223 lines
8.4 KiB
Python
from typing import Union, Tuple
|
||
|
||
from app.chain import ChainBase
|
||
from app.core.config import settings
|
||
from app.db.models.site import Site
|
||
from app.db.site_oper import SiteOper
|
||
from app.helper.browser import PlaywrightHelper
|
||
from app.helper.cloudflare import under_challenge
|
||
from app.helper.cookie import CookieHelper
|
||
from app.helper.message import MessageHelper
|
||
from app.log import logger
|
||
from app.utils.http import RequestUtils
|
||
from app.utils.site import SiteUtils
|
||
from app.utils.string import StringUtils
|
||
|
||
|
||
class SiteChain(ChainBase):
|
||
"""
|
||
站点管理处理链
|
||
"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.siteoper = SiteOper()
|
||
self.cookiehelper = CookieHelper()
|
||
self.message = MessageHelper()
|
||
|
||
def test(self, url: str) -> Tuple[bool, str]:
|
||
"""
|
||
测试站点是否可用
|
||
:param url: 站点域名
|
||
:return: (是否可用, 错误信息)
|
||
"""
|
||
# 检查域名是否可用
|
||
domain = StringUtils.get_url_domain(url)
|
||
site_info = self.siteoper.get_by_domain(domain)
|
||
if not site_info:
|
||
return False, f"站点【{url}】不存在"
|
||
site_url = site_info.url
|
||
site_cookie = site_info.cookie
|
||
ua = site_info.ua
|
||
render = site_info.render
|
||
proxies = settings.PROXY if site_info.proxy else None
|
||
proxy_server = settings.PROXY_SERVER if site_info.proxy else None
|
||
# 模拟登录
|
||
try:
|
||
# 访问链接
|
||
if render:
|
||
page_source = PlaywrightHelper().get_page_source(url=site_url,
|
||
cookies=site_cookie,
|
||
ua=ua,
|
||
proxies=proxy_server)
|
||
if not SiteUtils.is_logged_in(page_source):
|
||
if under_challenge(page_source):
|
||
return False, f"无法通过Cloudflare!"
|
||
return False, f"仿真登录失败,Cookie已失效!"
|
||
else:
|
||
res = RequestUtils(cookies=site_cookie,
|
||
ua=ua,
|
||
proxies=proxies
|
||
).get_res(url=site_url)
|
||
# 判断登录状态
|
||
if res and res.status_code in [200, 500, 403]:
|
||
if not SiteUtils.is_logged_in(res.text):
|
||
if under_challenge(res.text):
|
||
msg = "站点被Cloudflare防护,请打开站点浏览器仿真"
|
||
elif res.status_code == 200:
|
||
msg = "Cookie已失效"
|
||
else:
|
||
msg = f"状态码:{res.status_code}"
|
||
return False, f"连接失败,{msg}!"
|
||
else:
|
||
return True, f"连接成功"
|
||
elif res is not None:
|
||
return False, f"连接失败,状态码:{res.status_code}!"
|
||
else:
|
||
return False, f"连接失败,无法打开网站!"
|
||
except Exception as e:
|
||
return False, f"连接失败:{str(e)}!"
|
||
return True, "连接成功"
|
||
|
||
def remote_list(self, userid: Union[str, int] = None):
|
||
"""
|
||
查询所有站点,发送消息
|
||
"""
|
||
site_list = self.siteoper.list()
|
||
if not site_list:
|
||
self.post_message(title="没有维护任何站点信息!")
|
||
title = f"共有 {len(site_list)} 个站点,回复对应指令操作:" \
|
||
f"\n- 禁用站点:/site_disable [id]" \
|
||
f"\n- 启用站点:/site_enable [id]" \
|
||
f"\n- 更新站点Cookie:/site_cookie [id] [username] [password]"
|
||
messages = []
|
||
for site in site_list:
|
||
if site.render:
|
||
render_str = "🧭"
|
||
else:
|
||
render_str = ""
|
||
if site.is_active:
|
||
messages.append(f"{site.id}. [{site.name}]({site.url}){render_str}")
|
||
else:
|
||
messages.append(f"{site.id}. {site.name}")
|
||
# 发送列表
|
||
self.post_message(title=title, text="\n".join(messages), userid=userid)
|
||
|
||
def remote_disable(self, arg_str, userid: Union[str, int] = None):
|
||
"""
|
||
禁用站点
|
||
"""
|
||
if not arg_str:
|
||
return
|
||
arg_str = str(arg_str).strip()
|
||
if not arg_str.isdigit():
|
||
return
|
||
site_id = int(arg_str)
|
||
site = self.siteoper.get(site_id)
|
||
if not site:
|
||
self.post_message(title=f"站点编号 {site_id} 不存在!", userid=userid)
|
||
return
|
||
# 禁用站点
|
||
self.siteoper.update(site_id, {
|
||
"is_active": False
|
||
})
|
||
# 重新发送消息
|
||
self.remote_list()
|
||
|
||
def remote_enable(self, arg_str, userid: Union[str, int] = None):
|
||
"""
|
||
启用站点
|
||
"""
|
||
if not arg_str:
|
||
return
|
||
arg_str = str(arg_str).strip()
|
||
if not arg_str.isdigit():
|
||
return
|
||
site_id = int(arg_str)
|
||
site = self.siteoper.get(site_id)
|
||
if not site:
|
||
self.post_message(title=f"站点编号 {site_id} 不存在!", userid=userid)
|
||
return
|
||
# 禁用站点
|
||
self.siteoper.update(site_id, {
|
||
"is_active": True
|
||
})
|
||
# 重新发送消息
|
||
self.remote_list()
|
||
|
||
def update_cookie(self, site_info: Site,
|
||
username: str, password: str,
|
||
manual=False) -> Tuple[bool, str]:
|
||
"""
|
||
根据用户名密码更新站点Cookie
|
||
:param site_info: 站点信息
|
||
:param username: 用户名
|
||
:param password: 密码
|
||
:param manual: 是否手动更新
|
||
:return: (是否成功, 错误信息)
|
||
"""
|
||
# 更新站点Cookie
|
||
result = self.cookiehelper.get_site_cookie_ua(
|
||
url=site_info.url,
|
||
username=username,
|
||
password=password,
|
||
proxies=settings.PROXY_HOST if site_info.proxy else None
|
||
)
|
||
if result:
|
||
cookie, ua, msg = result
|
||
if not cookie:
|
||
if manual:
|
||
self.message.put(f"站点 {site_info.name} Cookie更新失败:{msg}!")
|
||
return False, msg
|
||
self.siteoper.update(site_info.id, {
|
||
"cookie": cookie,
|
||
"ua": ua
|
||
})
|
||
if manual:
|
||
self.message.put(f"站点 {site_info.name} Cookie更新成功!")
|
||
return True, msg
|
||
return False, "未知错误"
|
||
|
||
def remote_cookie(self, arg_str: str, userid: Union[str, int] = None):
|
||
"""
|
||
使用用户名密码更新站点Cookie
|
||
"""
|
||
err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password]," \
|
||
"[id]为站点编号,[uername]为站点用户名,[password]为站点密码"
|
||
if not arg_str:
|
||
self.post_message(title=err_title, userid=userid)
|
||
return
|
||
arg_str = str(arg_str).strip()
|
||
args = arg_str.split()
|
||
if len(args) != 3:
|
||
self.post_message(title=err_title, userid=userid)
|
||
return
|
||
site_id = args[0]
|
||
if not site_id.isdigit():
|
||
self.post_message(title=err_title, userid=userid)
|
||
return
|
||
# 站点ID
|
||
site_id = int(site_id)
|
||
# 站点信息
|
||
site_info = self.siteoper.get(site_id)
|
||
if not site_info:
|
||
self.post_message(title=f"站点编号 {site_id} 不存在!", userid=userid)
|
||
return
|
||
self.post_message(title=f"开始更新【{site_info.name}】Cookie&UA ...", userid=userid)
|
||
# 用户名
|
||
username = args[1]
|
||
# 密码
|
||
password = args[2]
|
||
# 更新Cookie
|
||
status, msg = self.update_cookie(site_info=site_info,
|
||
username=username,
|
||
password=password)
|
||
if not status:
|
||
logger.error(msg)
|
||
self.post_message(title=f"【{site_info.name}】 Cookie&UA更新失败!",
|
||
text=f"错误原因:{msg}",
|
||
userid=userid)
|
||
else:
|
||
self.post_message(title=f"【{site_info.name}】 Cookie&UA更新成功",
|
||
userid=userid)
|