This commit is contained in:
jxxghp
2023-08-30 22:01:07 +08:00
parent 9326676bb6
commit 38eff64c95
6 changed files with 269 additions and 333 deletions

View File

@ -30,20 +30,17 @@ class FileTransferModule(_ModuleBase):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def transfer(self, path: Path, mediainfo: MediaInfo,
def transfer(self, path: Path, meta: MetaBase, mediainfo: MediaInfo,
transfer_type: str, target: Path = None,
meta: MetaBase = None,
epformat: EpisodeFormat = None,
min_filesize: int = 0) -> TransferInfo:
epformat: EpisodeFormat = None) -> TransferInfo:
"""
文件转移
:param path: 文件路径
:param meta: 预识别的元数据,仅单文件转移时传递
:param mediainfo: 识别的媒体信息
:param transfer_type: 转移方式
:param target: 目标路径
:param meta: 预识别的元数据,仅单文件转移时传递
:param epformat: 集识别格式
:param min_filesize: 最小文件大小(MB)
:return: {path, target_path, message}
"""
# 获取目标路径
@ -54,12 +51,11 @@ class FileTransferModule(_ModuleBase):
return TransferInfo(message="未找到媒体库目录,无法转移文件")
# 转移
return self.transfer_media(in_path=path,
in_meta=meta,
mediainfo=mediainfo,
transfer_type=transfer_type,
target_dir=target,
in_meta=meta,
epformat=epformat,
min_filesize=min_filesize)
epformat=epformat)
@staticmethod
def __transfer_command(file_item: Path, target_file: Path, transfer_type: str) -> int:
@ -244,9 +240,9 @@ class FileTransferModule(_ModuleBase):
logger.error(f"音轨文件 {file_name} {transfer_type}失败:{reason}")
return 0
def __transfer_bluray_dir(self, file_path: Path, new_path: Path, transfer_type: str) -> int:
def __transfer_dir(self, file_path: Path, new_path: Path, transfer_type: str) -> int:
"""
转移蓝光文件夹
转移整个文件夹
:param file_path: 原路径
:param new_path: 新路径
:param transfer_type: RmtMode转移方式
@ -265,7 +261,7 @@ class FileTransferModule(_ModuleBase):
def __transfer_dir_files(self, src_dir: Path, target_dir: Path, transfer_type: str) -> int:
"""
按目录结构转移所有文件
按目录结构转移目录下所有文件
:param src_dir: 原路径
:param target_dir: 新路径
:param transfer_type: RmtMode转移方式
@ -321,22 +317,20 @@ class FileTransferModule(_ModuleBase):
def transfer_media(self,
in_path: Path,
in_meta: MetaBase,
mediainfo: MediaInfo,
transfer_type: str,
target_dir: Path = None,
in_meta: MetaBase = None,
target_dir: Path,
epformat: EpisodeFormat = None,
min_filesize: int = 0
) -> TransferInfo:
"""
识别并转移一个文件、多个文件或者目录
识别并转移一个文件或者一个目录下的所有文件
:param in_path: 转移的路径,可能是一个文件也可以是一个目录
:param in_meta预识别元数据
:param mediainfo: 媒体信息
:param target_dir: 目的文件夹,非空的转移到该文件夹,为空时则按类型转移到配置文件中的媒体库文件夹
:param transfer_type: 文件转移方式
:param in_meta预识别元数为空则重新识别
:param epformat: 识别的剧集格式
:param min_filesize: 最小文件大小MB小于该值的文件不转移
:return: TransferInfo、错误信息
"""
# 检查目录路径
@ -372,160 +366,96 @@ class FileTransferModule(_ModuleBase):
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
# 总大小
total_filesize = 0
# 处理文件清单
file_list = []
# 目标文件清单
file_list_new = []
# 失败文件清单
fail_list = []
# 错误信息
err_msgs = []
# 判断是否为蓝光原盘
bluray_flag = SystemUtils.is_bluray_dir(in_path)
if bluray_flag:
# 识别目录名称,不包括后缀
meta = MetaInfo(in_path.stem)
# 判断是否为文件夹
if in_path.is_dir():
# 转移整个目录
# 是否蓝光原盘
bluray_flag = SystemUtils.is_bluray_dir(in_path)
# 目的路径
new_path = self.get_rename_path(
path=target_dir,
template_string=rename_format,
rename_dict=self.__get_naming_dict(meta=meta,
rename_dict=self.__get_naming_dict(meta=in_meta,
mediainfo=mediainfo)
).parent
# 转移蓝光原盘
retcode = self.__transfer_bluray_dir(file_path=in_path,
new_path=new_path,
transfer_type=transfer_type)
retcode = self.__transfer_dir(file_path=in_path,
new_path=new_path,
transfer_type=transfer_type)
if retcode != 0:
return TransferInfo(message=f"{retcode},蓝光原盘转移失败")
else:
# 计算大小
total_filesize += in_path.stat().st_size
# 返回转移后的路径
return TransferInfo(path=in_path,
target_path=new_path,
total_size=total_filesize,
is_bluray=bluray_flag,
file_list=[],
file_list_new=[])
logger.error(f"文件夹 {in_path} 转移失败,错误码:{retcode}")
return TransferInfo(message=f"文件夹 {in_path} 转移失败,错误码:{retcode}")
logger.info(f"文件夹 {in_path} 转移成功")
# 返回转移后的路径
return TransferInfo(path=in_path,
target_path=new_path,
total_size=in_path.stat().st_size,
is_bluray=bluray_flag)
else:
# 获取文件清单
transfer_files: List[Path] = SystemUtils.list_files(
directory=in_path,
extensions=settings.RMT_MEDIAEXT,
min_filesize=min_filesize
)
if len(transfer_files) == 0:
return TransferInfo(message=f"{in_path} 目录下没有找到可转移的文件")
# 转移单个文件
# 文件结束季为空
in_meta.end_season = None
# 文件总季数为1
if in_meta.total_season:
in_meta.total_season = 1
# 文件不可能有多集
if in_meta.total_episode > 2:
in_meta.total_episode = 1
in_meta.end_episode = None
# 有集自定义格式
formaterHandler = FormatParser(eformat=epformat.format,
details=epformat.detail,
part=epformat.part,
offset=epformat.offset) if epformat else None
# 过滤出符合自定义剧集格式的文件
# 自定义识别集数、PART
if formaterHandler:
transfer_files = [x for x in transfer_files if formaterHandler.match(x.name)]
if len(transfer_files) == 0:
return TransferInfo(message=f"{in_path} 目录下没有找到符合自定义剧集格式的文件")
# 开始集、结束集、PART
begin_ep, end_ep, part = formaterHandler.split_episode(in_path.stem)
if begin_ep is not None:
in_meta.begin_episode = begin_ep
in_meta.part = part
if end_ep is not None:
in_meta.end_episode = end_ep
if not in_meta:
# 识别目录名称,不包括后缀
meta = MetaInfo(in_path.stem)
else:
meta = in_meta
# 目的路径
new_path = target_dir / (self.get_rename_path(
# 目的文件名
new_file = self.get_rename_path(
path=target_dir,
template_string=rename_format,
rename_dict=self.__get_naming_dict(meta=meta,
mediainfo=mediainfo)).parents[-2].name)
# 转移所有文件
for transfer_file in transfer_files:
try:
if not in_meta:
# 识别文件元数据,不包含后缀
file_meta = MetaInfo(transfer_file.stem)
# 合并元数据
file_meta.merge(meta)
else:
file_meta = in_meta
rename_dict=self.__get_naming_dict(
meta=in_meta,
mediainfo=mediainfo,
file_ext=in_path.suffix
)
)
# 文件结束季为空
file_meta.end_season = None
# 文件总季数为1
if file_meta.total_season:
file_meta.total_season = 1
# 文件不可能有多集
if file_meta.total_episode > 2:
file_meta.total_episode = 1
file_meta.end_episode = None
# 判断是否要覆盖
overflag = False
if new_file.exists():
if new_file.stat().st_size < in_path.stat().st_size:
logger.info(f"目标文件已存在,但文件大小更小,将覆盖:{new_file}")
overflag = True
# 自定义识别
if formaterHandler:
# 开始集、结束集、PART
begin_ep, end_ep, part = formaterHandler.split_episode(transfer_file.stem)
if begin_ep is not None:
file_meta.begin_episode = begin_ep
file_meta.part = part
if end_ep is not None:
file_meta.end_episode = end_ep
# 目的文件名
new_file = self.get_rename_path(
path=target_dir,
template_string=rename_format,
rename_dict=self.__get_naming_dict(meta=file_meta,
mediainfo=mediainfo,
file_ext=transfer_file.suffix)
)
# 判断是否要覆盖
overflag = False
if new_file.exists():
if new_file.stat().st_size < transfer_file.stat().st_size:
logger.info(f"目标文件已存在,但文件大小更小,将覆盖:{new_file}")
overflag = True
# 转移文件
retcode = self.__transfer_file(file_item=transfer_file,
new_file=new_file,
transfer_type=transfer_type,
over_flag=overflag)
if retcode != 0:
logger.error(f"{transfer_file} 转移文件失败,错误码:{retcode}")
err_msgs.append(f"{transfer_file.name}:错误码 {retcode}")
fail_list.append(transfer_file)
continue
# 源文件清单
file_list.append(str(transfer_file))
# 目的文件清单
file_list_new.append(str(new_file))
# 计算总大小
total_filesize += new_file.stat().st_size
except Exception as err:
err_msgs.append(f"{transfer_file.name}{err}")
logger.error(f"{transfer_file}转移失败:{err}")
fail_list.append(transfer_file)
if not file_list:
# 没有成功的
return TransferInfo(message="\n".join(err_msgs))
# 转移文件
retcode = self.__transfer_file(file_item=in_path,
new_file=new_file,
transfer_type=transfer_type,
over_flag=overflag)
if retcode != 0:
logger.error(f"文件 {in_path} 转移失败,错误码:{retcode}")
return TransferInfo(message=f"文件 {in_path.name} 转移失败,错误码:{retcode}",
fail_list=[in_path])
logger.info(f"文件 {in_path} 转移成功")
return TransferInfo(path=in_path,
target_path=new_path,
message="\n".join(err_msgs),
file_count=len(file_list),
total_size=total_filesize,
fail_list=fail_list,
is_bluray=bluray_flag,
file_list=file_list,
file_list_new=file_list_new)
target_path=new_file,
file_count=1,
total_size=in_path.stat().st_size,
is_bluray=False,
file_list=[in_path],
file_list_new=[new_file])
@staticmethod
def __get_naming_dict(meta: MetaBase, mediainfo: MediaInfo, file_ext: str = None) -> dict: