add 站点字幕自动下载

This commit is contained in:
jxxghp 2023-06-13 12:34:38 +08:00
parent 6e2f40cae6
commit 2b9975b9b2
2 changed files with 132 additions and 13 deletions

View File

@ -4,6 +4,7 @@ from pathlib import Path
from typing import Tuple, Optional, List, Union
from urllib.parse import unquote
from requests import Response
from torrentool.api import Torrent
from app.core.config import settings
@ -97,21 +98,13 @@ class TorrentHelper:
# 检查是不是种子文件,如果不是仍然抛出异常
try:
# 读取种子文件名
file_name = self.__get_url_torrent_filename(req, url)
file_name = self.get_url_filename(req, url)
# 种子文件路径
file_path = Path(settings.TEMP_PATH) / file_name
# 保存到文件
file_path.write_bytes(req.content)
# 获取种子目录和文件清单
torrentinfo = Torrent.from_file(file_path)
# 获取目录名
folder_name = torrentinfo.name
# 获取文件清单
if len(torrentinfo.files) <= 1:
# 单文件种子
file_list = [torrentinfo.name]
else:
file_list = [fileinfo.name for fileinfo in torrentinfo.files]
folder_name, file_list = self.get_torrent_info(file_path)
# 成功拿到种子数据
return file_path, req.content, folder_name, file_list, ""
except Exception as err:
@ -128,7 +121,31 @@ class TorrentHelper:
return None, None, "", [], f"下载种子出错,状态码:{req.status_code}"
@staticmethod
def __get_url_torrent_filename(req, url: str) -> str:
def get_torrent_info(torrent_path: Path) -> Tuple[str, List[str]]:
"""
获取种子文件的文件夹名和文件清单
:param torrent_path: 种子文件路径
:return: 文件夹名文件清单
"""
if not torrent_path or not torrent_path.exists():
return "", []
try:
torrentinfo = Torrent.from_file(torrent_path)
# 获取目录名
folder_name = torrentinfo.name
# 获取文件清单
if len(torrentinfo.files) <= 1:
# 单文件种子
file_list = [torrentinfo.name]
else:
file_list = [fileinfo.name for fileinfo in torrentinfo.files]
return folder_name, file_list
except Exception as err:
logger.error(f"种子文件解析失败:{err}")
return "", []
@staticmethod
def get_url_filename(req: Response, url: str) -> str:
"""
从下载请求中获取种子文件名
"""

View File

@ -1,8 +1,17 @@
import shutil
from pathlib import Path
from typing import Tuple, Union
from lxml import etree
from app.core.config import settings
from app.core.context import Context
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.modules import _ModuleBase
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
class SubtitleModule(_ModuleBase):
@ -10,6 +19,11 @@ class SubtitleModule(_ModuleBase):
字幕下载模块
"""
# 站点详情页字幕下载链接识别XPATH
_SITE_SUBTITLE_XPATH = [
'//td[@class="rowhead"][text()="字幕"]/following-sibling::td//a/@href',
]
def init_module(self) -> None:
pass
@ -21,10 +35,98 @@ class SubtitleModule(_ModuleBase):
def download_added(self, context: Context, torrent_path: Path) -> None:
"""
添加下载任务成功后从站点下载字幕
添加下载任务成功后从站点下载字幕保存到下载目录
:param context: 上下文包括识别信息媒体信息种子信息
:param torrent_path: 种子文件地址
:return: None该方法可被多个模块同时处理
"""
pass
# 种子信息
torrent = context.torrent_info
if not torrent.page_url:
return
# 字幕下载目录
logger.info("开始从站点下载字幕:%s" % torrent.page_url)
# 获取种子信息
folder_name, _ = TorrentHelper.get_torrent_info(torrent_path)
download_dir = Path(settings.DOWNLOAD_PATH) / folder_name
if not download_dir.is_dir():
logger.warn(f"下载目录不正确:{download_dir}")
return
# 读取网站代码
request = RequestUtils(cookies=torrent.site_cookie, headers=torrent.site_ua)
res = request.get_res(torrent.page_url)
if res and res.status_code == 200:
if not res.text:
logger.warn(f"读取页面代码失败:{torrent.page_url}")
return
html = etree.HTML(res.text)
sublink_list = []
for xpath in self._SITE_SUBTITLE_XPATH:
sublinks = html.xpath(xpath)
if sublinks:
for sublink in sublinks:
if not sublink:
continue
if not sublink.startswith("http"):
base_url = StringUtils.get_base_url(torrent.page_url)
if sublink.startswith("/"):
sublink = "%s%s" % (base_url, sublink)
else:
sublink = "%s/%s" % (base_url, sublink)
sublink_list.append(sublink)
# 下载所有字幕文件
for sublink in sublink_list:
logger.info(f"找到字幕下载链接:{sublink},开始下载...")
# 下载
ret = request.get_res(sublink)
if ret and ret.status_code == 200:
# 创建目录
if not download_dir.exists():
download_dir.mkdir(parents=True, exist_ok=True)
# 保存ZIP
file_name = TorrentHelper.get_url_filename(ret, sublink)
if not file_name:
logger.warn(f"链接不是字幕文件:{sublink}")
continue
if file_name.lower().endswith(".zip"):
# ZIP包
zip_file = settings.TEMP_PATH / file_name
# 保存
zip_file.write_bytes(ret.content)
# 解压路径
zip_path = zip_file.with_name(zip_file.stem)
# 解压文件
shutil.unpack_archive(zip_file, zip_path, format='zip')
# 遍历转移文件
for sub_file in SystemUtils.list_files_with_extensions(zip_path, settings.RMT_SUBEXT):
target_sub_file = download_dir / sub_file.name
if target_sub_file.exists():
logger.info(f"字幕文件已存在:{target_sub_file}")
continue
logger.info(f"转移字幕 {sub_file}{target_sub_file} ...")
SystemUtils.copy(sub_file, target_sub_file)
# 删除临时文件
try:
shutil.rmtree(zip_path)
zip_file.unlink()
except Exception as err:
logger.error(f"删除临时文件失败:{err}")
else:
sub_file = settings.TEMP_PATH / file_name
# 保存
sub_file.write_bytes(ret.content)
target_sub_file = download_dir / sub_file.name
logger.info(f"转移字幕 {sub_file}{target_sub_file}")
SystemUtils.copy(sub_file, target_sub_file)
else:
logger.error(f"下载字幕文件失败:{sublink}")
continue
if sublink_list:
logger.info(f"{torrent.page_url} 页面字幕下载完成")
else:
logger.warn(f"{torrent.page_url} 页面未找到字幕下载链接")
elif res is not None:
logger.warn(f"连接 {torrent.page_url} 失败,状态码:{res.status_code}")
else:
logger.warn(f"无法打开链接:{torrent.page_url}")