MoviePilot/app/chain/site.py
2023-07-07 08:16:26 +08:00

218 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Union, Tuple
from app.chain import ChainBase
from app.core.config import settings
from app.db.models.site import Site
from app.db.site_oper import SiteOper
from app.helper.browser import PlaywrightHelper
from app.helper.cloudflare import under_challenge
from app.helper.cookie import CookieHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.site import SiteUtils
from app.utils.string import StringUtils
class SiteChain(ChainBase):
"""
站点管理处理链
"""
def __init__(self):
super().__init__()
self.siteoper = SiteOper()
self.cookiehelper = CookieHelper()
self.message = MessageHelper()
def test(self, url: str) -> Tuple[bool, str]:
"""
测试站点是否可用
:param url: 站点域名
:return: (是否可用, 错误信息)
"""
# 检查域名是否可用
domain = StringUtils.get_url_domain(url)
site_info = self.siteoper.get_by_domain(domain)
if not site_info:
return False, f"站点【{url}】不存在"
site_url = site_info.url
site_cookie = site_info.cookie
ua = site_info.ua
render = site_info.render
public = site_info.public
proxies = settings.PROXY if site_info.proxy else None
proxy_server = settings.PROXY_SERVER if site_info.proxy else None
# 模拟登录
try:
# 访问链接
if render:
page_source = PlaywrightHelper().get_page_source(url=site_url,
cookies=site_cookie,
ua=ua,
proxies=proxy_server)
if not public and not SiteUtils.is_logged_in(page_source):
if under_challenge(page_source):
return False, f"无法通过Cloudflare"
return False, f"仿真登录失败Cookie已失效"
else:
res = RequestUtils(cookies=site_cookie,
ua=ua,
proxies=proxies
).get_res(url=site_url)
# 判断登录状态
if res and res.status_code in [200, 500, 403]:
if not public and not SiteUtils.is_logged_in(res.text):
if under_challenge(res.text):
msg = "站点被Cloudflare防护请打开站点浏览器仿真"
elif res.status_code == 200:
msg = "Cookie已失效"
else:
msg = f"状态码:{res.status_code}"
return False, f"{msg}"
elif public and res.status_code != 200:
return False, f"状态码:{res.status_code}"
elif res is not None:
return False, f"状态码:{res.status_code}"
else:
return False, f"无法打开网站!"
except Exception as e:
return False, f"{str(e)}"
return True, "连接成功"
def remote_list(self, userid: Union[str, int] = None):
"""
查询所有站点,发送消息
"""
site_list = self.siteoper.list()
if not site_list:
self.post_message(title="没有维护任何站点信息!")
title = f"共有 {len(site_list)} 个站点,回复对应指令操作:" \
f"\n- 禁用站点:/site_disable [id]" \
f"\n- 启用站点:/site_enable [id]" \
f"\n- 更新站点Cookie/site_cookie [id] [username] [password]"
messages = []
for site in site_list:
if site.render:
render_str = "🧭"
else:
render_str = ""
if site.is_active:
messages.append(f"{site.id}. [{site.name}]({site.url}){render_str}")
else:
messages.append(f"{site.id}. {site.name}")
# 发送列表
self.post_message(title=title, text="\n".join(messages), userid=userid)
def remote_disable(self, arg_str, userid: Union[str, int] = None):
"""
禁用站点
"""
if not arg_str:
return
arg_str = str(arg_str).strip()
if not arg_str.isdigit():
return
site_id = int(arg_str)
site = self.siteoper.get(site_id)
if not site:
self.post_message(title=f"站点编号 {site_id} 不存在!", userid=userid)
return
# 禁用站点
self.siteoper.update(site_id, {
"is_active": False
})
# 重新发送消息
self.remote_list()
def remote_enable(self, arg_str, userid: Union[str, int] = None):
"""
启用站点
"""
if not arg_str:
return
arg_str = str(arg_str).strip()
if not arg_str.isdigit():
return
site_id = int(arg_str)
site = self.siteoper.get(site_id)
if not site:
self.post_message(title=f"站点编号 {site_id} 不存在!", userid=userid)
return
# 禁用站点
self.siteoper.update(site_id, {
"is_active": True
})
# 重新发送消息
self.remote_list()
def update_cookie(self, site_info: Site,
username: str, password: str) -> Tuple[bool, str]:
"""
根据用户名密码更新站点Cookie
:param site_info: 站点信息
:param username: 用户名
:param password: 密码
:return: (是否成功, 错误信息)
"""
# 更新站点Cookie
result = self.cookiehelper.get_site_cookie_ua(
url=site_info.url,
username=username,
password=password,
proxies=settings.PROXY_HOST if site_info.proxy else None
)
if result:
cookie, ua, msg = result
if not cookie:
return False, msg
self.siteoper.update(site_info.id, {
"cookie": cookie,
"ua": ua
})
return True, msg
return False, "未知错误"
def remote_cookie(self, arg_str: str, userid: Union[str, int] = None):
"""
使用用户名密码更新站点Cookie
"""
err_title = "请输入正确的命令格式:/site_cookie [id] [username] [password]" \
"[id]为站点编号,[uername]为站点用户名,[password]为站点密码"
if not arg_str:
self.post_message(title=err_title, userid=userid)
return
arg_str = str(arg_str).strip()
args = arg_str.split()
if len(args) != 3:
self.post_message(title=err_title, userid=userid)
return
site_id = args[0]
if not site_id.isdigit():
self.post_message(title=err_title, userid=userid)
return
# 站点ID
site_id = int(site_id)
# 站点信息
site_info = self.siteoper.get(site_id)
if not site_info:
self.post_message(title=f"站点编号 {site_id} 不存在!", userid=userid)
return
self.post_message(title=f"开始更新【{site_info.name}】Cookie&UA ...", userid=userid)
# 用户名
username = args[1]
# 密码
password = args[2]
# 更新Cookie
status, msg = self.update_cookie(site_info=site_info,
username=username,
password=password)
if not status:
logger.error(msg)
self.post_message(title=f"{site_info.name}】 Cookie&UA更新失败",
text=f"错误原因:{msg}",
userid=userid)
else:
self.post_message(title=f"{site_info.name}】 Cookie&UA更新成功",
userid=userid)