feat:目录结构重大调整,谨慎更新到dev

This commit is contained in:
jxxghp
2024-05-22 20:02:14 +08:00
parent f1f187fc77
commit d9c6375252
17 changed files with 359 additions and 245 deletions

View File

@ -9,17 +9,26 @@ from app.core.config import settings
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo, MetaInfoPath
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.modules import _ModuleBase
from app.schemas import TransferInfo, ExistMediaInfo, TmdbEpisode
from app.schemas import TransferInfo, ExistMediaInfo, TmdbEpisode, MediaDirectory
from app.schemas.types import MediaType
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
lock = Lock()
class FileTransferModule(_ModuleBase):
"""
文件整理模块
"""
def __init__(self):
super().__init__()
self.directoryhelper = DirectoryHelper()
self.messagehelper = MessageHelper()
def init_module(self) -> None:
pass
@ -35,34 +44,37 @@ class FileTransferModule(_ModuleBase):
"""
测试模块连接性
"""
if not settings.DOWNLOAD_PATH:
return False, "下载目录未设置"
directoryhelper = DirectoryHelper()
# 检查下载目录
download_paths: List[str] = []
for path in [settings.DOWNLOAD_PATH,
settings.DOWNLOAD_MOVIE_PATH,
settings.DOWNLOAD_TV_PATH,
settings.DOWNLOAD_ANIME_PATH]:
download_paths = directoryhelper.get_download_dirs()
if not download_paths:
return False, "下载目录未设置"
for d_path in download_paths:
path = d_path.path
if not path:
continue
return False, f"下载目录 {d_path.name} 对应路径未设置"
download_path = Path(path)
if not download_path.exists():
return False, f"下载目录 {download_path} 不存在"
download_paths.append(path)
# 下载目录的设备ID
download_devids = [Path(path).stat().st_dev for path in download_paths]
return False, f"下载目录 {d_path.name} 对应路径 {path} 不存在"
# 检查媒体库目录
if not settings.LIBRARY_PATH:
libaray_dirs = directoryhelper.get_library_dirs()
if not libaray_dirs:
return False, "媒体库目录未设置"
# 比较媒体库目录的设备ID
for path in settings.LIBRARY_PATHS:
for l_path in libaray_dirs:
path = l_path.path
if not path:
return False, f"媒体库目录 {l_path.name} 对应路径未设置"
library_path = Path(path)
if not library_path.exists():
return False, f"媒体库目录不存在:{library_path}"
return False, f"媒体库目录{l_path.name} 对应的路径 {path} 不存在"
if settings.DOWNLOADER_MONITOR and settings.TRANSFER_TYPE == "link":
if library_path.stat().st_dev not in download_devids:
return False, f"媒体库目录 {library_path} " \
f"与下载目录 {','.join(download_paths)} 不在同一设备,将无法硬链接"
for d_path in download_paths:
download_path = Path(d_path.path)
if l_path.media_type == d_path.media_type and l_path.category == d_path.category:
if library_path.stat().st_dev != download_path.stat().st_dev:
return False, f"媒体库目录 {library_path} " \
f"与下载目录 {download_path} 不在同一设备,将无法硬链接"
return True, ""
def init_setting(self) -> Tuple[str, Union[str, bool]]:
@ -83,10 +95,16 @@ class FileTransferModule(_ModuleBase):
"""
# 获取目标路径
if not target:
# 未指定目的目录,根据源目录选择一个媒体库
target = self.get_target_path(in_path=path)
# 未指定目的目录,选择一个媒体库
dir_info = DirectoryHelper().get_library_dir(mediainfo)
if not dir_info or not dir_info.path:
logger.error(f"{mediainfo.type.value} {mediainfo.title_year} 未找到有效的媒体库目录,无法转移文件,源路径:{path}")
return TransferInfo(success=False,
path=path,
message="未找到有效的媒体库目录")
# 拼装媒体库一、二级子目录
target = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target)
need_scrape = dir_info.scrape
target = self.__get_dest_dir(mediainfo=mediainfo, target_dir=dir_info)
else:
# 指定了目的目录
if target.is_file():
@ -94,24 +112,18 @@ class FileTransferModule(_ModuleBase):
return TransferInfo(success=False,
path=path,
message=f"{target} 不是有效目录")
# 只拼装二级子目录(不要一级目录)
target = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target, typename_dir=False)
if not target:
logger.error("未找到媒体库目录,无法转移文件")
return TransferInfo(success=False,
path=path,
message="未找到媒体库目录")
else:
logger.info(f"获取转移目标路径:{target}")
# FIXME 指定了目的目录时,拿不到是否需要刮削的配置了
need_scrape = False
logger.info(f"获取转移目标路径:{target}")
# 转移
return self.transfer_media(in_path=path,
in_meta=meta,
mediainfo=mediainfo,
transfer_type=transfer_type,
target_dir=target,
episodes_info=episodes_info)
episodes_info=episodes_info,
need_scrape=need_scrape)
@staticmethod
def __transfer_command(file_item: Path, target_file: Path, transfer_type: str) -> int:
@ -384,43 +396,17 @@ class FileTransferModule(_ModuleBase):
over_flag=over_flag)
@staticmethod
def __get_dest_dir(mediainfo: MediaInfo, target_dir: Path, typename_dir: bool = True) -> Path:
def __get_dest_dir(mediainfo: MediaInfo, target_dir: MediaDirectory) -> Path:
"""
根据设置并装媒体库目录
:param mediainfo: 媒体信息
:target_dir: 媒体库根目录
:typename_dir: 是否加上类型目录
"""
if not target_dir:
return target_dir
if mediainfo.type == MediaType.MOVIE:
# 电影
if typename_dir:
# 目的目录加上类型和二级分类
target_dir = target_dir / settings.LIBRARY_MOVIE_NAME / mediainfo.category
else:
# 目的目录加上二级分类
target_dir = target_dir / mediainfo.category
if mediainfo.type == MediaType.TV:
# 电视剧
if mediainfo.genre_ids \
and set(mediainfo.genre_ids).intersection(set(settings.ANIME_GENREIDS)):
# 动漫
if typename_dir:
target_dir = target_dir / (settings.LIBRARY_ANIME_NAME
or settings.LIBRARY_TV_NAME) / mediainfo.category
else:
target_dir = target_dir / mediainfo.category
else:
# 电视剧
if typename_dir:
target_dir = target_dir / settings.LIBRARY_TV_NAME / mediainfo.category
else:
target_dir = target_dir / mediainfo.category
return target_dir
if target_dir.auto_category or target_dir.category:
return Path(target_dir.path) / mediainfo.category
else:
return Path(target_dir.path)
def transfer_media(self,
in_path: Path,
@ -428,7 +414,8 @@ class FileTransferModule(_ModuleBase):
mediainfo: MediaInfo,
transfer_type: str,
target_dir: Path,
episodes_info: List[TmdbEpisode] = None
episodes_info: List[TmdbEpisode] = None,
need_scrape: bool = False
) -> TransferInfo:
"""
识别并转移一个文件或者一个目录下的所有文件
@ -438,6 +425,7 @@ class FileTransferModule(_ModuleBase):
:param target_dir: 媒体库根目录
:param transfer_type: 文件转移方式
:param episodes_info: 当前季的全部集信息
:param need_scrape: 是否需要刮削
:return: TransferInfo、错误信息
"""
# 检查目录路径
@ -490,7 +478,8 @@ class FileTransferModule(_ModuleBase):
path=in_path,
target_path=new_path,
total_size=file_size,
is_bluray=bluray_flag)
is_bluray=bluray_flag,
need_scrape=need_scrape)
else:
# 转移单个文件
if mediainfo.type == MediaType.TV:
@ -589,7 +578,8 @@ class FileTransferModule(_ModuleBase):
total_size=file_size,
is_bluray=False,
file_list=[str(in_path)],
file_list_new=[str(new_file)])
file_list_new=[str(new_file)],
need_scrape=need_scrape)
@staticmethod
def __get_naming_dict(meta: MetaBase, mediainfo: MediaInfo, file_ext: str = None,
@ -689,86 +679,21 @@ class FileTransferModule(_ModuleBase):
else:
return Path(render_str)
@staticmethod
def get_library_path(path: Path):
"""
根据文件路径查询其所在的媒体库目录,查询不到的返回输入目录
"""
if not path:
return None
if not settings.LIBRARY_PATHS:
return path
# 目的路径,多路径以,分隔
dest_paths = settings.LIBRARY_PATHS
for libpath in dest_paths:
try:
if path.is_relative_to(libpath):
return libpath
except Exception as e:
logger.debug(f"计算媒体库路径时出错:{str(e)}")
continue
return path
@staticmethod
def get_target_path(in_path: Path = None) -> Optional[Path]:
"""
计算一个最好的目的目录有in_path时找与in_path同路径的没有in_path时顺序查找1个符合大小要求的没有in_path和size时返回第1个
:param in_path: 源目录
"""
if not settings.LIBRARY_PATHS:
return None
# 目的路径,多路径以,分隔
dest_paths = settings.LIBRARY_PATHS
# 只有一个路径,直接返回
if len(dest_paths) == 1:
return dest_paths[0]
# 匹配有最长共同上级路径的目录
max_length = 0
target_path = None
if in_path:
for path in dest_paths:
try:
# 计算in_path和path的公共字符串长度
relative = StringUtils.find_common_prefix(str(in_path), str(path))
if len(str(path)) == len(relative):
# 目录完整匹配的,直接返回
return path
if len(relative) > max_length:
# 更新最大长度
max_length = len(relative)
target_path = path
except Exception as e:
logger.debug(f"计算目标路径时出错:{str(e)}")
continue
if target_path:
return target_path
# 顺序匹配第1个满足空间存储要求的目录
if in_path.exists():
file_size = in_path.stat().st_size
for path in dest_paths:
if SystemUtils.free_space(path) > file_size:
return path
# 默认返回第1个
return dest_paths[0]
def media_exists(self, mediainfo: MediaInfo, **kwargs) -> Optional[ExistMediaInfo]:
"""
判断媒体文件是否存在于本地文件系统,只支持标准媒体库结构
:param mediainfo: 识别的媒体信息
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
if not settings.LIBRARY_PATHS:
return None
# 目的路径
dest_paths = settings.LIBRARY_PATHS
dest_paths = DirectoryHelper().get_library_dirs()
# 检查每一个媒体库目录
for dest_path in dest_paths:
# 媒体库路径
target_dir = self.get_target_path(dest_path)
if not target_dir:
if not dest_path.path:
continue
# 媒体分类路径
target_dir = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target_dir)
target_dir = self.__get_dest_dir(mediainfo=mediainfo, target_dir=dest_path)
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT