841 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			841 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import re
 | ||
| from pathlib import Path
 | ||
| from threading import Lock
 | ||
| from typing import Optional, List, Tuple, Union, Dict
 | ||
| 
 | ||
| from jinja2 import Template
 | ||
| 
 | ||
| from app.core.config import settings
 | ||
| from app.core.context import MediaInfo
 | ||
| from app.core.meta import MetaBase
 | ||
| from app.core.metainfo import MetaInfo, MetaInfoPath
 | ||
| from app.log import logger
 | ||
| from app.modules import _ModuleBase
 | ||
| from app.schemas import TransferInfo, ExistMediaInfo, TmdbEpisode
 | ||
| from app.schemas.types import MediaType
 | ||
| from app.utils.string import StringUtils
 | ||
| from app.utils.system import SystemUtils
 | ||
| 
 | ||
| lock = Lock()
 | ||
| 
 | ||
| 
 | ||
| class FileTransferModule(_ModuleBase):
 | ||
| 
 | ||
|     def init_module(self) -> None:
 | ||
|         pass
 | ||
| 
 | ||
|     def stop(self):
 | ||
|         pass
 | ||
| 
 | ||
|     def test(self) -> Tuple[bool, str]:
 | ||
|         """
 | ||
|         测试模块连接性
 | ||
|         """
 | ||
|         if not settings.DOWNLOAD_PATH:
 | ||
|             return False, "下载目录未设置"
 | ||
|         # 检查下载目录
 | ||
|         download_paths: List[str] = []
 | ||
|         for path in [settings.DOWNLOAD_PATH,
 | ||
|                      settings.DOWNLOAD_MOVIE_PATH,
 | ||
|                      settings.DOWNLOAD_TV_PATH,
 | ||
|                      settings.DOWNLOAD_ANIME_PATH]:
 | ||
|             if not path:
 | ||
|                 continue
 | ||
|             download_path = Path(path)
 | ||
|             if not download_path.exists():
 | ||
|                 return False, f"下载目录 {download_path} 不存在"
 | ||
|             download_paths.append(path)
 | ||
|         # 下载目录的设备ID
 | ||
|         download_devids = [Path(path).stat().st_dev for path in download_paths]
 | ||
|         # 检查媒体库目录
 | ||
|         if not settings.LIBRARY_PATH:
 | ||
|             return False, "媒体库目录未设置"
 | ||
|         # 比较媒体库目录的设备ID
 | ||
|         for path in settings.LIBRARY_PATHS:
 | ||
|             library_path = Path(path)
 | ||
|             if not library_path.exists():
 | ||
|                 return False, f"媒体库目录不存在:{library_path}"
 | ||
|             if settings.DOWNLOADER_MONITOR and settings.TRANSFER_TYPE == "link":
 | ||
|                 if library_path.stat().st_dev not in download_devids:
 | ||
|                     return False, f"媒体库目录 {library_path} " \
 | ||
|                                   f"与下载目录 {','.join(download_paths)} 不在同一设备,将无法硬链接"
 | ||
|         return True, ""
 | ||
| 
 | ||
|     def init_setting(self) -> Tuple[str, Union[str, bool]]:
 | ||
|         pass
 | ||
| 
 | ||
|     def transfer(self, path: Path, meta: MetaBase, mediainfo: MediaInfo,
 | ||
|                  transfer_type: str, target: Path = None,
 | ||
|                  episodes_info: List[TmdbEpisode] = None) -> TransferInfo:
 | ||
|         """
 | ||
|         文件转移
 | ||
|         :param path:  文件路径
 | ||
|         :param meta: 预识别的元数据,仅单文件转移时传递
 | ||
|         :param mediainfo:  识别的媒体信息
 | ||
|         :param transfer_type:  转移方式
 | ||
|         :param target:  目标路径
 | ||
|         :param episodes_info: 当前季的全部集信息
 | ||
|         :return: {path, target_path, message}
 | ||
|         """
 | ||
|         # 获取目标路径
 | ||
|         if not target:
 | ||
|             # 未指定目的目录,根据源目录选择一个媒体库
 | ||
|             target = self.get_target_path(in_path=path)
 | ||
|             # 拼装媒体库一、二级子目录
 | ||
|             target = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target)
 | ||
|         else:
 | ||
|             # 指定了目的目录
 | ||
|             if target.is_file():
 | ||
|                 logger.error(f"转移目标路径是一个文件 {target} 是一个文件")
 | ||
|                 return TransferInfo(success=False,
 | ||
|                                     path=path,
 | ||
|                                     message=f"{target} 不是有效目录")
 | ||
|             # 只拼装二级子目录(不要一级目录)
 | ||
|             target = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target, typename_dir=False)
 | ||
| 
 | ||
|         if not target:
 | ||
|             logger.error("未找到媒体库目录,无法转移文件")
 | ||
|             return TransferInfo(success=False,
 | ||
|                                 path=path,
 | ||
|                                 message="未找到媒体库目录")
 | ||
|         else:
 | ||
|             logger.info(f"获取转移目标路径:{target}")
 | ||
| 
 | ||
|         # 转移
 | ||
|         return self.transfer_media(in_path=path,
 | ||
|                                    in_meta=meta,
 | ||
|                                    mediainfo=mediainfo,
 | ||
|                                    transfer_type=transfer_type,
 | ||
|                                    target_dir=target,
 | ||
|                                    episodes_info=episodes_info)
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def __transfer_command(file_item: Path, target_file: Path, transfer_type: str) -> int:
 | ||
|         """
 | ||
|         使用系统命令处理单个文件
 | ||
|         :param file_item: 文件路径
 | ||
|         :param target_file: 目标文件路径
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         """
 | ||
|         with lock:
 | ||
| 
 | ||
|             # 转移
 | ||
|             if transfer_type == 'link':
 | ||
|                 # 硬链接
 | ||
|                 retcode, retmsg = SystemUtils.link(file_item, target_file)
 | ||
|             elif transfer_type == 'softlink':
 | ||
|                 # 软链接
 | ||
|                 retcode, retmsg = SystemUtils.softlink(file_item, target_file)
 | ||
|             elif transfer_type == 'move':
 | ||
|                 # 移动
 | ||
|                 retcode, retmsg = SystemUtils.move(file_item, target_file)
 | ||
|             elif transfer_type == 'rclone_move':
 | ||
|                 # Rclone 移动
 | ||
|                 retcode, retmsg = SystemUtils.rclone_move(file_item, target_file)
 | ||
|             elif transfer_type == 'rclone_copy':
 | ||
|                 # Rclone 复制
 | ||
|                 retcode, retmsg = SystemUtils.rclone_copy(file_item, target_file)
 | ||
|             else:
 | ||
|                 # 复制
 | ||
|                 retcode, retmsg = SystemUtils.copy(file_item, target_file)
 | ||
| 
 | ||
|         if retcode != 0:
 | ||
|             logger.error(retmsg)
 | ||
| 
 | ||
|         return retcode
 | ||
| 
 | ||
|     def __transfer_other_files(self, org_path: Path, new_path: Path,
 | ||
|                                transfer_type: str, over_flag: bool) -> int:
 | ||
|         """
 | ||
|         根据文件名转移其他相关文件
 | ||
|         :param org_path: 原文件名
 | ||
|         :param new_path: 新文件名
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         :param over_flag: 是否覆盖,为True时会先删除再转移
 | ||
|         """
 | ||
|         retcode = self.__transfer_subtitles(org_path, new_path, transfer_type)
 | ||
|         if retcode != 0:
 | ||
|             return retcode
 | ||
|         retcode = self.__transfer_audio_track_files(org_path, new_path, transfer_type, over_flag)
 | ||
|         if retcode != 0:
 | ||
|             return retcode
 | ||
|         return 0
 | ||
| 
 | ||
|     def __transfer_subtitles(self, org_path: Path, new_path: Path, transfer_type: str) -> int:
 | ||
|         """
 | ||
|         根据文件名转移对应字幕文件
 | ||
|         :param org_path: 原文件名
 | ||
|         :param new_path: 新文件名
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         """
 | ||
|         # 字幕正则式
 | ||
|         _zhcn_sub_re = r"([.\[(](((zh[-_])?(cn|ch[si]|sg|sc))|zho?" \
 | ||
|                        r"|chinese|(cn|ch[si]|sg|zho?|eng)[-_&](cn|ch[si]|sg|zho?|eng)" \
 | ||
|                        r"|简[体中]?)[.\])])" \
 | ||
|                        r"|([\u4e00-\u9fa5]{0,3}[中双][\u4e00-\u9fa5]{0,2}[字文语][\u4e00-\u9fa5]{0,3})" \
 | ||
|                        r"|简体|简中|JPSC" \
 | ||
|                        r"|(?<![a-z0-9])gb(?![a-z0-9])"
 | ||
|         _zhtw_sub_re = r"([.\[(](((zh[-_])?(hk|tw|cht|tc))" \
 | ||
|                        r"|繁[体中]?)[.\])])" \
 | ||
|                        r"|繁体中[文字]|中[文字]繁体|繁体|JPTC" \
 | ||
|                        r"|(?<![a-z0-9])big5(?![a-z0-9])"
 | ||
|         _eng_sub_re = r"[.\[(]eng[.\])]"
 | ||
| 
 | ||
|         # 比对文件名并转移字幕
 | ||
|         org_dir: Path = org_path.parent
 | ||
|         file_list: List[Path] = SystemUtils.list_files(org_dir, settings.RMT_SUBEXT)
 | ||
|         if len(file_list) == 0:
 | ||
|             logger.debug(f"{org_dir} 目录下没有找到字幕文件...")
 | ||
|         else:
 | ||
|             logger.debug("字幕文件清单:" + str(file_list))
 | ||
|             # 识别文件名
 | ||
|             metainfo = MetaInfo(title=org_path.name)
 | ||
|             for file_item in file_list:
 | ||
|                 # 识别字幕文件名
 | ||
|                 sub_file_name = re.sub(_zhtw_sub_re,
 | ||
|                                        ".",
 | ||
|                                        re.sub(_zhcn_sub_re,
 | ||
|                                               ".",
 | ||
|                                               file_item.name,
 | ||
|                                               flags=re.I),
 | ||
|                                        flags=re.I)
 | ||
|                 sub_file_name = re.sub(_eng_sub_re, ".", sub_file_name, flags=re.I)
 | ||
|                 sub_metainfo = MetaInfo(title=file_item.name)
 | ||
|                 # 匹配字幕文件名
 | ||
|                 if (org_path.stem == Path(sub_file_name).stem) or \
 | ||
|                         (sub_metainfo.cn_name and sub_metainfo.cn_name == metainfo.cn_name) \
 | ||
|                         or (sub_metainfo.en_name and sub_metainfo.en_name == metainfo.en_name):
 | ||
|                     if metainfo.part and metainfo.part != sub_metainfo.part:
 | ||
|                         continue
 | ||
|                     if metainfo.season \
 | ||
|                             and metainfo.season != sub_metainfo.season:
 | ||
|                         continue
 | ||
|                     if metainfo.episode \
 | ||
|                             and metainfo.episode != sub_metainfo.episode:
 | ||
|                         continue
 | ||
|                     new_file_type = ""
 | ||
|                     # 兼容jellyfin字幕识别(多重识别), emby则会识别最后一个后缀
 | ||
|                     if re.search(_zhcn_sub_re, file_item.name, re.I):
 | ||
|                         new_file_type = ".chi.zh-cn"
 | ||
|                     elif re.search(_zhtw_sub_re, file_item.name,
 | ||
|                                    re.I):
 | ||
|                         new_file_type = ".zh-tw"
 | ||
|                     elif re.search(_eng_sub_re, file_item.name, re.I):
 | ||
|                         new_file_type = ".eng"
 | ||
|                     # 通过对比字幕文件大小  尽量转移所有存在的字幕
 | ||
|                     file_ext = file_item.suffix
 | ||
|                     new_sub_tag_dict = {
 | ||
|                         ".eng": ".英文",
 | ||
|                         ".chi.zh-cn": ".简体中文",
 | ||
|                         ".zh-tw": ".繁体中文"
 | ||
|                     }
 | ||
|                     new_sub_tag_list = [
 | ||
|                         new_file_type if t == 0 else "%s%s(%s)" % (new_file_type,
 | ||
|                                                                    new_sub_tag_dict.get(
 | ||
|                                                                        new_file_type, ""
 | ||
|                                                                    ),
 | ||
|                                                                    t) for t in range(6)
 | ||
|                     ]
 | ||
|                     for new_sub_tag in new_sub_tag_list:
 | ||
|                         new_file: Path = new_path.with_name(new_path.stem + new_sub_tag + file_ext)
 | ||
|                         # 如果字幕文件不存在, 直接转移字幕, 并跳出循环
 | ||
|                         try:
 | ||
|                             if not new_file.exists():
 | ||
|                                 logger.debug(f"正在处理字幕:{file_item.name}")
 | ||
|                                 retcode = self.__transfer_command(file_item=file_item,
 | ||
|                                                                   target_file=new_file,
 | ||
|                                                                   transfer_type=transfer_type)
 | ||
|                                 if retcode == 0:
 | ||
|                                     logger.info(f"字幕 {file_item.name} {transfer_type}完成")
 | ||
|                                     break
 | ||
|                                 else:
 | ||
|                                     logger.error(f"字幕 {file_item.name} {transfer_type}失败,错误码 {retcode}")
 | ||
|                                     return retcode
 | ||
|                             # 如果字幕文件的大小与已存在文件相同, 说明已经转移过了, 则跳出循环
 | ||
|                             elif new_file.stat().st_size == file_item.stat().st_size:
 | ||
|                                 logger.info(f"字幕 new_file 已存在")
 | ||
|                                 break
 | ||
|                             # 否则 循环继续 > 通过new_sub_tag_list 获取新的tag附加到字幕文件名, 继续检查是否能转移
 | ||
|                         except OSError as reason:
 | ||
|                             logger.info(f"字幕 {new_file} 出错了,原因: {reason}")
 | ||
|         return 0
 | ||
| 
 | ||
|     def __transfer_audio_track_files(self, org_path: Path, new_path: Path,
 | ||
|                                      transfer_type: str, over_flag: bool) -> int:
 | ||
|         """
 | ||
|         根据文件名转移对应音轨文件
 | ||
|         :param org_path: 原文件名
 | ||
|         :param new_path: 新文件名
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         :param over_flag: 是否覆盖,为True时会先删除再转移
 | ||
|         """
 | ||
|         dir_name = org_path.parent
 | ||
|         file_name = org_path.name
 | ||
|         file_list: List[Path] = SystemUtils.list_files(dir_name, ['.mka'])
 | ||
|         pending_file_list: List[Path] = [file for file in file_list if org_path.stem == file.stem]
 | ||
|         if len(pending_file_list) == 0:
 | ||
|             logger.debug(f"{dir_name} 目录下没有找到匹配的音轨文件")
 | ||
|         else:
 | ||
|             logger.debug("音轨文件清单:" + str(pending_file_list))
 | ||
|             for track_file in pending_file_list:
 | ||
|                 track_ext = track_file.suffix
 | ||
|                 new_track_file = new_path.with_name(new_path.stem + track_ext)
 | ||
|                 if new_track_file.exists():
 | ||
|                     if not over_flag:
 | ||
|                         logger.warn(f"音轨文件已存在:{new_track_file}")
 | ||
|                         continue
 | ||
|                     else:
 | ||
|                         logger.info(f"正在删除已存在的音轨文件:{new_track_file}")
 | ||
|                         new_track_file.unlink()
 | ||
|                 try:
 | ||
|                     logger.info(f"正在转移音轨文件:{track_file} 到 {new_track_file}")
 | ||
|                     retcode = self.__transfer_command(file_item=track_file,
 | ||
|                                                       target_file=new_track_file,
 | ||
|                                                       transfer_type=transfer_type)
 | ||
|                     if retcode == 0:
 | ||
|                         logger.info(f"音轨文件 {file_name} {transfer_type}完成")
 | ||
|                     else:
 | ||
|                         logger.error(f"音轨文件 {file_name} {transfer_type}失败,错误码:{retcode}")
 | ||
|                 except OSError as reason:
 | ||
|                     logger.error(f"音轨文件 {file_name} {transfer_type}失败:{reason}")
 | ||
|         return 0
 | ||
| 
 | ||
|     def __transfer_dir(self, file_path: Path, new_path: Path, transfer_type: str) -> int:
 | ||
|         """
 | ||
|         转移整个文件夹
 | ||
|         :param file_path: 原路径
 | ||
|         :param new_path: 新路径
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         """
 | ||
|         logger.info(f"正在{transfer_type}目录:{file_path} 到 {new_path}")
 | ||
|         # 复制
 | ||
|         retcode = self.__transfer_dir_files(src_dir=file_path,
 | ||
|                                             target_dir=new_path,
 | ||
|                                             transfer_type=transfer_type)
 | ||
|         if retcode == 0:
 | ||
|             logger.info(f"文件 {file_path} {transfer_type}完成")
 | ||
|         else:
 | ||
|             logger.error(f"文件{file_path} {transfer_type}失败,错误码:{retcode}")
 | ||
| 
 | ||
|         return retcode
 | ||
| 
 | ||
|     def __transfer_dir_files(self, src_dir: Path, target_dir: Path, transfer_type: str) -> int:
 | ||
|         """
 | ||
|         按目录结构转移目录下所有文件
 | ||
|         :param src_dir: 原路径
 | ||
|         :param target_dir: 新路径
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         """
 | ||
|         retcode = 0
 | ||
|         for file in src_dir.glob("**/*"):
 | ||
|             # 过滤掉目录
 | ||
|             if file.is_dir():
 | ||
|                 continue
 | ||
|             # 使用target_dir的父目录作为新的父目录
 | ||
|             new_file = target_dir.joinpath(file.relative_to(src_dir))
 | ||
|             if new_file.exists():
 | ||
|                 logger.warn(f"{new_file} 文件已存在")
 | ||
|                 continue
 | ||
|             if not new_file.parent.exists():
 | ||
|                 new_file.parent.mkdir(parents=True, exist_ok=True)
 | ||
|             retcode = self.__transfer_command(file_item=file,
 | ||
|                                               target_file=new_file,
 | ||
|                                               transfer_type=transfer_type)
 | ||
|             if retcode != 0:
 | ||
|                 break
 | ||
| 
 | ||
|         return retcode
 | ||
| 
 | ||
|     def __transfer_file(self, file_item: Path, new_file: Path, transfer_type: str,
 | ||
|                         over_flag: bool = False) -> int:
 | ||
|         """
 | ||
|         转移一个文件,同时处理其他相关文件
 | ||
|         :param file_item: 原文件路径
 | ||
|         :param new_file: 新文件路径
 | ||
|         :param transfer_type: RmtMode转移方式
 | ||
|         :param over_flag: 是否覆盖,为True时会先删除再转移
 | ||
|         """
 | ||
|         if new_file.exists() or new_file.is_symlink():
 | ||
|             if not over_flag:
 | ||
|                 logger.warn(f"文件已存在:{new_file}")
 | ||
|                 return 0
 | ||
|             else:
 | ||
|                 logger.info(f"正在删除已存在的文件:{new_file}")
 | ||
|                 new_file.unlink()
 | ||
|         logger.info(f"正在转移文件:{file_item} 到 {new_file}")
 | ||
|         # 创建父目录
 | ||
|         new_file.parent.mkdir(parents=True, exist_ok=True)
 | ||
|         retcode = self.__transfer_command(file_item=file_item,
 | ||
|                                           target_file=new_file,
 | ||
|                                           transfer_type=transfer_type)
 | ||
|         if retcode == 0:
 | ||
|             logger.info(f"文件 {file_item} {transfer_type}完成")
 | ||
|         else:
 | ||
|             logger.error(f"文件 {file_item} {transfer_type}失败,错误码:{retcode}")
 | ||
|             return retcode
 | ||
|         # 处理其他相关文件
 | ||
|         return self.__transfer_other_files(org_path=file_item,
 | ||
|                                            new_path=new_file,
 | ||
|                                            transfer_type=transfer_type,
 | ||
|                                            over_flag=over_flag)
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def __get_dest_dir(mediainfo: MediaInfo, target_dir: Path, typename_dir: bool = True) -> Path:
 | ||
|         """
 | ||
|         根据设置并装媒体库目录
 | ||
|         :param mediainfo: 媒体信息
 | ||
|         :target_dir: 媒体库根目录
 | ||
|         :typename_dir: 是否加上类型目录
 | ||
|         """
 | ||
|         if not target_dir:
 | ||
|             return target_dir
 | ||
| 
 | ||
|         if mediainfo.type == MediaType.MOVIE:
 | ||
|             # 电影
 | ||
|             if typename_dir:
 | ||
|                 # 目的目录加上类型和二级分类
 | ||
|                 target_dir = target_dir / settings.LIBRARY_MOVIE_NAME / mediainfo.category
 | ||
|             else:
 | ||
|                 # 目的目录加上二级分类
 | ||
|                 target_dir = target_dir / mediainfo.category
 | ||
| 
 | ||
|         if mediainfo.type == MediaType.TV:
 | ||
|             # 电视剧
 | ||
|             if mediainfo.genre_ids \
 | ||
|                     and set(mediainfo.genre_ids).intersection(set(settings.ANIME_GENREIDS)):
 | ||
|                 # 动漫
 | ||
|                 if typename_dir:
 | ||
|                     target_dir = target_dir / (settings.LIBRARY_ANIME_NAME
 | ||
|                                                or settings.LIBRARY_TV_NAME) / mediainfo.category
 | ||
|                 else:
 | ||
|                     target_dir = target_dir / mediainfo.category
 | ||
|             else:
 | ||
|                 # 电视剧
 | ||
|                 if typename_dir:
 | ||
|                     target_dir = target_dir / settings.LIBRARY_TV_NAME / mediainfo.category
 | ||
|                 else:
 | ||
|                     target_dir = target_dir / mediainfo.category
 | ||
| 
 | ||
|         return target_dir
 | ||
| 
 | ||
|     def transfer_media(self,
 | ||
|                        in_path: Path,
 | ||
|                        in_meta: MetaBase,
 | ||
|                        mediainfo: MediaInfo,
 | ||
|                        transfer_type: str,
 | ||
|                        target_dir: Path,
 | ||
|                        episodes_info: List[TmdbEpisode] = None
 | ||
|                        ) -> TransferInfo:
 | ||
|         """
 | ||
|         识别并转移一个文件或者一个目录下的所有文件
 | ||
|         :param in_path: 转移的路径,可能是一个文件也可以是一个目录
 | ||
|         :param in_meta:预识别元数据
 | ||
|         :param mediainfo: 媒体信息
 | ||
|         :param target_dir: 媒体库根目录
 | ||
|         :param transfer_type: 文件转移方式
 | ||
|         :param episodes_info: 当前季的全部集信息
 | ||
|         :return: TransferInfo、错误信息
 | ||
|         """
 | ||
|         # 检查目录路径
 | ||
|         if not in_path.exists():
 | ||
|             return TransferInfo(success=False,
 | ||
|                                 path=in_path,
 | ||
|                                 message=f"{in_path} 路径不存在")
 | ||
| 
 | ||
|         if transfer_type not in ['rclone_copy', 'rclone_move']:
 | ||
|             # 检查目标路径
 | ||
|             if not target_dir.exists():
 | ||
|                 logger.info(f"目标路径不存在,正在创建:{target_dir} ...")
 | ||
|                 target_dir.mkdir(parents=True, exist_ok=True)
 | ||
| 
 | ||
|         # 重命名格式
 | ||
|         rename_format = settings.TV_RENAME_FORMAT \
 | ||
|             if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
 | ||
| 
 | ||
|         # 判断是否为文件夹
 | ||
|         if in_path.is_dir():
 | ||
|             # 转移整个目录
 | ||
|             # 是否蓝光原盘
 | ||
|             bluray_flag = SystemUtils.is_bluray_dir(in_path)
 | ||
|             if bluray_flag:
 | ||
|                 logger.info(f"{in_path} 是蓝光原盘文件夹")
 | ||
|             # 原文件大小
 | ||
|             file_size = in_path.stat().st_size
 | ||
|             # 目的路径
 | ||
|             new_path = self.get_rename_path(
 | ||
|                 path=target_dir,
 | ||
|                 template_string=rename_format,
 | ||
|                 rename_dict=self.__get_naming_dict(meta=in_meta,
 | ||
|                                                    mediainfo=mediainfo)
 | ||
|             ).parent
 | ||
|             # 转移蓝光原盘
 | ||
|             retcode = self.__transfer_dir(file_path=in_path,
 | ||
|                                           new_path=new_path,
 | ||
|                                           transfer_type=transfer_type)
 | ||
|             if retcode != 0:
 | ||
|                 logger.error(f"文件夹 {in_path} 转移失败,错误码:{retcode}")
 | ||
|                 return TransferInfo(success=False,
 | ||
|                                     message=f"错误码:{retcode}",
 | ||
|                                     path=in_path,
 | ||
|                                     target_path=new_path,
 | ||
|                                     is_bluray=bluray_flag)
 | ||
| 
 | ||
|             logger.info(f"文件夹 {in_path} 转移成功")
 | ||
|             # 返回转移后的路径
 | ||
|             return TransferInfo(success=True,
 | ||
|                                 path=in_path,
 | ||
|                                 target_path=new_path,
 | ||
|                                 total_size=file_size,
 | ||
|                                 is_bluray=bluray_flag)
 | ||
|         else:
 | ||
|             # 转移单个文件
 | ||
|             if mediainfo.type == MediaType.TV:
 | ||
|                 # 电视剧
 | ||
|                 if in_meta.begin_episode is None:
 | ||
|                     logger.warn(f"文件 {in_path} 转移失败:未识别到文件集数")
 | ||
|                     return TransferInfo(success=False,
 | ||
|                                         message=f"未识别到文件集数",
 | ||
|                                         path=in_path,
 | ||
|                                         fail_list=[str(in_path)])
 | ||
| 
 | ||
|                 # 文件结束季为空
 | ||
|                 in_meta.end_season = None
 | ||
|                 # 文件总季数为1
 | ||
|                 if in_meta.total_season:
 | ||
|                     in_meta.total_season = 1
 | ||
|                 # 文件不可能超过2集
 | ||
|                 if in_meta.total_episode > 2:
 | ||
|                     in_meta.total_episode = 1
 | ||
|                     in_meta.end_episode = None
 | ||
| 
 | ||
|             # 目的文件名
 | ||
|             new_file = self.get_rename_path(
 | ||
|                 path=target_dir,
 | ||
|                 template_string=rename_format,
 | ||
|                 rename_dict=self.__get_naming_dict(
 | ||
|                     meta=in_meta,
 | ||
|                     mediainfo=mediainfo,
 | ||
|                     episodes_info=episodes_info,
 | ||
|                     file_ext=in_path.suffix
 | ||
|                 )
 | ||
|             )
 | ||
| 
 | ||
|             # 判断是否要覆盖
 | ||
|             overflag = False
 | ||
|             target_file = new_file
 | ||
|             if new_file.exists() or new_file.is_symlink():
 | ||
|                 if new_file.is_symlink():
 | ||
|                     target_file = new_file.readlink()
 | ||
|                     if not target_file.exists():
 | ||
|                         overflag = True
 | ||
|                 if not overflag:
 | ||
|                     # 目标文件已存在
 | ||
|                     logger.info(f"目标文件已存在,转移覆盖模式:{settings.OVERWRITE_MODE}")
 | ||
|                     match settings.OVERWRITE_MODE:
 | ||
|                         case 'always':
 | ||
|                             # 总是覆盖同名文件
 | ||
|                             overflag = True
 | ||
|                         case 'size':
 | ||
|                             # 存在时大覆盖小
 | ||
|                             if target_file.stat().st_size < in_path.stat().st_size:
 | ||
|                                 logger.info(f"目标文件文件大小更小,将覆盖:{new_file}")
 | ||
|                                 overflag = True
 | ||
|                             else:
 | ||
|                                 return TransferInfo(success=False,
 | ||
|                                                     message=f"媒体库中已存在,且质量更好",
 | ||
|                                                     path=in_path,
 | ||
|                                                     target_path=new_file,
 | ||
|                                                     fail_list=[str(in_path)])
 | ||
|                         case 'never':
 | ||
|                             # 存在不覆盖
 | ||
|                             return TransferInfo(success=False,
 | ||
|                                                 message=f"媒体库中已存在,当前设置为不覆盖",
 | ||
|                                                 path=in_path,
 | ||
|                                                 target_path=new_file,
 | ||
|                                                 fail_list=[str(in_path)])
 | ||
|                         case 'latest':
 | ||
|                             # 仅保留最新版本
 | ||
|                             logger.info(f"仅保留最新版本,将覆盖:{new_file}")
 | ||
|                             overflag = True
 | ||
|             else:
 | ||
|                 if settings.OVERWRITE_MODE == 'latest':
 | ||
|                     # 文件不存在,但仅保留最新版本
 | ||
|                     logger.info(f"转移覆盖模式:{settings.OVERWRITE_MODE},仅保留最新版本")
 | ||
|                     self.delete_all_version_files(new_file)
 | ||
|             # 原文件大小
 | ||
|             file_size = in_path.stat().st_size
 | ||
|             # 转移文件
 | ||
|             retcode = self.__transfer_file(file_item=in_path,
 | ||
|                                            new_file=new_file,
 | ||
|                                            transfer_type=transfer_type,
 | ||
|                                            over_flag=overflag)
 | ||
|             if retcode != 0:
 | ||
|                 logger.error(f"文件 {in_path} 转移失败,错误码:{retcode}")
 | ||
|                 return TransferInfo(success=False,
 | ||
|                                     message=f"错误码:{retcode}",
 | ||
|                                     path=in_path,
 | ||
|                                     target_path=new_file,
 | ||
|                                     fail_list=[str(in_path)])
 | ||
| 
 | ||
|             logger.info(f"文件 {in_path} 转移成功")
 | ||
|             return TransferInfo(success=True,
 | ||
|                                 path=in_path,
 | ||
|                                 target_path=new_file,
 | ||
|                                 file_count=1,
 | ||
|                                 total_size=file_size,
 | ||
|                                 is_bluray=False,
 | ||
|                                 file_list=[str(in_path)],
 | ||
|                                 file_list_new=[str(new_file)])
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def __get_naming_dict(meta: MetaBase, mediainfo: MediaInfo, file_ext: str = None,
 | ||
|                           episodes_info: List[TmdbEpisode] = None) -> dict:
 | ||
|         """
 | ||
|         根据媒体信息,返回Format字典
 | ||
|         :param meta: 文件元数据
 | ||
|         :param mediainfo: 识别的媒体信息
 | ||
|         :param file_ext: 文件扩展名
 | ||
|         :param episodes_info: 当前季的全部集信息
 | ||
|         """
 | ||
| 
 | ||
|         def __convert_invalid_characters(filename: str):
 | ||
|             if not filename:
 | ||
|                 return filename
 | ||
|             invalid_characters = r'\/:*?"<>|'
 | ||
|             # 创建半角到全角字符的转换表
 | ||
|             halfwidth_chars = "".join([chr(i) for i in range(33, 127)])
 | ||
|             fullwidth_chars = "".join([chr(i + 0xFEE0) for i in range(33, 127)])
 | ||
|             translation_table = str.maketrans(halfwidth_chars, fullwidth_chars)
 | ||
|             # 将不支持的字符替换为对应的全角字符
 | ||
|             for char in invalid_characters:
 | ||
|                 filename = filename.replace(char, char.translate(translation_table))
 | ||
|             return filename
 | ||
| 
 | ||
|         # 获取集标题
 | ||
|         episode_title = None
 | ||
|         if meta.begin_episode and episodes_info:
 | ||
|             for episode in episodes_info:
 | ||
|                 if episode.episode_number == meta.begin_episode:
 | ||
|                     episode_title = episode.name
 | ||
|                     break
 | ||
| 
 | ||
|         return {
 | ||
|             # 标题
 | ||
|             "title": __convert_invalid_characters(mediainfo.title),
 | ||
|             # 英文标题
 | ||
|             "en_title": __convert_invalid_characters(mediainfo.en_title),
 | ||
|             # 原语种标题
 | ||
|             "original_title": __convert_invalid_characters(mediainfo.original_title),
 | ||
|             # 原文件名
 | ||
|             "original_name": f"{meta.org_string}{file_ext}",
 | ||
|             # 识别名称(优先使用中文)
 | ||
|             "name": meta.name,
 | ||
|             # 识别的英文名称(可能为空)
 | ||
|             "en_name": meta.en_name,
 | ||
|             # 年份
 | ||
|             "year": mediainfo.year or meta.year,
 | ||
|             # 资源类型
 | ||
|             "resourceType": meta.resource_type,
 | ||
|             # 特效
 | ||
|             "effect": meta.resource_effect,
 | ||
|             # 版本
 | ||
|             "edition": meta.edition,
 | ||
|             # 分辨率
 | ||
|             "videoFormat": meta.resource_pix,
 | ||
|             # 制作组/字幕组
 | ||
|             "releaseGroup": meta.resource_team,
 | ||
|             # 视频编码
 | ||
|             "videoCodec": meta.video_encode,
 | ||
|             # 音频编码
 | ||
|             "audioCodec": meta.audio_encode,
 | ||
|             # TMDBID
 | ||
|             "tmdbid": mediainfo.tmdb_id,
 | ||
|             # IMDBID
 | ||
|             "imdbid": mediainfo.imdb_id,
 | ||
|             # 豆瓣ID
 | ||
|             "doubanid": mediainfo.douban_id,
 | ||
|             # 季号
 | ||
|             "season": meta.season_seq,
 | ||
|             # 集号
 | ||
|             "episode": meta.episode_seqs,
 | ||
|             # 季集 SxxExx
 | ||
|             "season_episode": "%s%s" % (meta.season, meta.episodes),
 | ||
|             # 段/节
 | ||
|             "part": meta.part,
 | ||
|             # 剧集标题
 | ||
|             "episode_title": episode_title,
 | ||
|             # 文件后缀
 | ||
|             "fileExt": file_ext,
 | ||
|             # 自定义占位符
 | ||
|             "customization": meta.customization
 | ||
|         }
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def get_rename_path(template_string: str, rename_dict: dict, path: Path = None) -> Path:
 | ||
|         """
 | ||
|         生成重命名后的完整路径
 | ||
|         """
 | ||
|         # 创建jinja2模板对象
 | ||
|         template = Template(template_string)
 | ||
|         # 渲染生成的字符串
 | ||
|         render_str = template.render(rename_dict)
 | ||
|         # 目的路径
 | ||
|         if path:
 | ||
|             return path / render_str
 | ||
|         else:
 | ||
|             return Path(render_str)
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def get_library_path(path: Path):
 | ||
|         """
 | ||
|         根据文件路径查询其所在的媒体库目录,查询不到的返回输入目录
 | ||
|         """
 | ||
|         if not path:
 | ||
|             return None
 | ||
|         if not settings.LIBRARY_PATHS:
 | ||
|             return path
 | ||
|         # 目的路径,多路径以,分隔
 | ||
|         dest_paths = settings.LIBRARY_PATHS
 | ||
|         for libpath in dest_paths:
 | ||
|             try:
 | ||
|                 if path.is_relative_to(libpath):
 | ||
|                     return libpath
 | ||
|             except Exception as e:
 | ||
|                 logger.debug(f"计算媒体库路径时出错:{str(e)}")
 | ||
|                 continue
 | ||
|         return path
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def get_target_path(in_path: Path = None) -> Optional[Path]:
 | ||
|         """
 | ||
|         计算一个最好的目的目录,有in_path时找与in_path同路径的,没有in_path时,顺序查找1个符合大小要求的,没有in_path和size时,返回第1个
 | ||
|         :param in_path: 源目录
 | ||
|         """
 | ||
|         if not settings.LIBRARY_PATHS:
 | ||
|             return None
 | ||
|         # 目的路径,多路径以,分隔
 | ||
|         dest_paths = settings.LIBRARY_PATHS
 | ||
|         # 只有一个路径,直接返回
 | ||
|         if len(dest_paths) == 1:
 | ||
|             return dest_paths[0]
 | ||
|         # 匹配有最长共同上级路径的目录
 | ||
|         max_length = 0
 | ||
|         target_path = None
 | ||
|         if in_path:
 | ||
|             for path in dest_paths:
 | ||
|                 try:
 | ||
|                     # 计算in_path和path的公共字符串长度
 | ||
|                     relative = StringUtils.find_common_prefix(str(in_path), str(path))
 | ||
|                     if len(relative) > max_length:
 | ||
|                         max_length = len(relative)
 | ||
|                         target_path = path
 | ||
|                 except Exception as e:
 | ||
|                     logger.debug(f"计算目标路径时出错:{str(e)}")
 | ||
|                     continue
 | ||
|             if target_path:
 | ||
|                 return target_path
 | ||
|             # 顺序匹配第1个满足空间存储要求的目录
 | ||
|             if in_path.exists():
 | ||
|                 file_size = in_path.stat().st_size
 | ||
|                 for path in dest_paths:
 | ||
|                     if SystemUtils.free_space(path) > file_size:
 | ||
|                         return path
 | ||
|         # 默认返回第1个
 | ||
|         return dest_paths[0]
 | ||
| 
 | ||
|     def media_exists(self, mediainfo: MediaInfo, **kwargs) -> Optional[ExistMediaInfo]:
 | ||
|         """
 | ||
|         判断媒体文件是否存在于本地文件系统,只支持标准媒体库结构
 | ||
|         :param mediainfo:  识别的媒体信息
 | ||
|         :return: 如不存在返回None,存在时返回信息,包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
 | ||
|         """
 | ||
|         if not settings.LIBRARY_PATHS:
 | ||
|             return None
 | ||
|         # 目的路径
 | ||
|         dest_paths = settings.LIBRARY_PATHS
 | ||
|         # 检查每一个媒体库目录
 | ||
|         for dest_path in dest_paths:
 | ||
|             # 媒体库路径
 | ||
|             target_dir = self.get_target_path(dest_path)
 | ||
|             if not target_dir:
 | ||
|                 continue
 | ||
|             # 媒体分类路径
 | ||
|             target_dir = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target_dir)
 | ||
|             # 重命名格式
 | ||
|             rename_format = settings.TV_RENAME_FORMAT \
 | ||
|                 if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
 | ||
|             # 相对路径
 | ||
|             meta = MetaInfo(mediainfo.title)
 | ||
|             rel_path = self.get_rename_path(
 | ||
|                 template_string=rename_format,
 | ||
|                 rename_dict=self.__get_naming_dict(meta=meta,
 | ||
|                                                    mediainfo=mediainfo)
 | ||
|             )
 | ||
|             # 取相对路径的第1层目录
 | ||
|             if rel_path.parts:
 | ||
|                 media_path = target_dir / rel_path.parts[0]
 | ||
|             else:
 | ||
|                 continue
 | ||
| 
 | ||
|             # 检查媒体文件夹是否存在
 | ||
|             if not media_path.exists():
 | ||
|                 continue
 | ||
| 
 | ||
|             # 检索媒体文件
 | ||
|             media_files = SystemUtils.list_files(directory=media_path, extensions=settings.RMT_MEDIAEXT)
 | ||
|             if not media_files:
 | ||
|                 continue
 | ||
| 
 | ||
|             if mediainfo.type == MediaType.MOVIE:
 | ||
|                 # 电影存在任何文件为存在
 | ||
|                 logger.info(f"文件系统已存在:{mediainfo.title_year}")
 | ||
|                 return ExistMediaInfo(type=MediaType.MOVIE)
 | ||
|             else:
 | ||
|                 # 电视剧检索集数
 | ||
|                 seasons: Dict[int, list] = {}
 | ||
|                 for media_file in media_files:
 | ||
|                     file_meta = MetaInfo(media_file.stem)
 | ||
|                     season_index = file_meta.begin_season or 1
 | ||
|                     episode_index = file_meta.begin_episode
 | ||
|                     if not episode_index:
 | ||
|                         continue
 | ||
|                     if season_index not in seasons:
 | ||
|                         seasons[season_index] = []
 | ||
|                     seasons[season_index].append(episode_index)
 | ||
|                 # 返回剧集情况
 | ||
|                 logger.info(f"{mediainfo.title_year} 文件系统已存在:{seasons}")
 | ||
|                 return ExistMediaInfo(type=MediaType.TV, seasons=seasons)
 | ||
|         # 不存在
 | ||
|         return None
 | ||
| 
 | ||
|     @staticmethod
 | ||
|     def delete_all_version_files(path: Path) -> bool:
 | ||
|         """
 | ||
|         删除目录下的所有版本文件
 | ||
|         :param path: 目录路径
 | ||
|         """
 | ||
|         if not path.exists():
 | ||
|             return False
 | ||
|         # 识别文件中的季集信息
 | ||
|         meta = MetaInfoPath(path)
 | ||
|         season = meta.season
 | ||
|         episode = meta.episode
 | ||
|         # 检索媒体文件
 | ||
|         logger.warn(f"正在删除目标目录中其它版本的文件:{path.parent}")
 | ||
|         media_files = SystemUtils.list_files(directory=path.parent, extensions=settings.RMT_MEDIAEXT)
 | ||
|         if not media_files:
 | ||
|             logger.info(f"目录中没有媒体文件:{path.parent}")
 | ||
|             return False
 | ||
|         # 删除文件
 | ||
|         for media_file in media_files:
 | ||
|             if media_file == path:
 | ||
|                 continue
 | ||
|             # 识别文件中的季集信息
 | ||
|             filemeta = MetaInfoPath(media_file)
 | ||
|             # 相同季集的文件才删除
 | ||
|             if filemeta.season != season or filemeta.episode != episode:
 | ||
|                 continue
 | ||
|             logger.info(f"正在删除文件:{media_file}")
 | ||
|             media_file.unlink()
 | ||
|         return True
 |