import re from pathlib import Path from threading import Lock from typing import Optional, List, Tuple, Union, Dict from jinja2 import Template from app.core.config import settings from app.core.context import MediaInfo from app.core.meta import MetaBase from app.core.metainfo import MetaInfo from app.log import logger from app.modules import _ModuleBase from app.schemas import TransferInfo, ExistMediaInfo, TmdbEpisode from app.schemas.types import MediaType from app.utils.system import SystemUtils lock = Lock() class FileTransferModule(_ModuleBase): def init_module(self) -> None: pass def stop(self): pass def init_setting(self) -> Tuple[str, Union[str, bool]]: pass def transfer(self, path: Path, meta: MetaBase, mediainfo: MediaInfo, transfer_type: str, target: Path = None, episodes_info: List[TmdbEpisode] = None) -> TransferInfo: """ 文件转移 :param path: 文件路径 :param meta: 预识别的元数据,仅单文件转移时传递 :param mediainfo: 识别的媒体信息 :param transfer_type: 转移方式 :param target: 目标路径 :param episodes_info: 当前季的全部集信息 :return: {path, target_path, message} """ # 获取目标路径 if not target: target = self.get_target_path(in_path=path) elif not target.exists() or target.is_file(): # 目的路径不存在或者是文件时,找对应的媒体库目录 target = self.get_library_path(target) if not target: logger.error("未找到媒体库目录,无法转移文件") return TransferInfo(success=False, path=path, message="未找到媒体库目录") # 转移 return self.transfer_media(in_path=path, in_meta=meta, mediainfo=mediainfo, transfer_type=transfer_type, target_dir=target, episodes_info=episodes_info) @staticmethod def __transfer_command(file_item: Path, target_file: Path, transfer_type: str) -> int: """ 使用系统命令处理单个文件 :param file_item: 文件路径 :param target_file: 目标文件路径 :param transfer_type: RmtMode转移方式 """ with lock: # 转移 if transfer_type == 'link': # 硬链接 retcode, retmsg = SystemUtils.link(file_item, target_file) elif transfer_type == 'softlink': # 软链接 retcode, retmsg = SystemUtils.softlink(file_item, target_file) elif transfer_type == 'move': # 移动 retcode, retmsg = SystemUtils.move(file_item, target_file) elif transfer_type == 'rclone_move': # Rclone 移动 retcode, retmsg = SystemUtils.rclone_move(file_item, target_file) elif transfer_type == 'rclone_copy': # Rclone 复制 retcode, retmsg = SystemUtils.rclone_copy(file_item, target_file) else: # 复制 retcode, retmsg = SystemUtils.copy(file_item, target_file) if retcode != 0: logger.error(retmsg) return retcode def __transfer_other_files(self, org_path: Path, new_path: Path, transfer_type: str, over_flag: bool) -> int: """ 根据文件名转移其他相关文件 :param org_path: 原文件名 :param new_path: 新文件名 :param transfer_type: RmtMode转移方式 :param over_flag: 是否覆盖,为True时会先删除再转移 """ retcode = self.__transfer_subtitles(org_path, new_path, transfer_type) if retcode != 0: return retcode retcode = self.__transfer_audio_track_files(org_path, new_path, transfer_type, over_flag) if retcode != 0: return retcode return 0 def __transfer_subtitles(self, org_path: Path, new_path: Path, transfer_type: str) -> int: """ 根据文件名转移对应字幕文件 :param org_path: 原文件名 :param new_path: 新文件名 :param transfer_type: RmtMode转移方式 """ # 字幕正则式 _zhcn_sub_re = r"([.\[(](((zh[-_])?(cn|ch[si]|sg|sc))|zho?" \ r"|chinese|(cn|ch[si]|sg|zho?|eng)[-_&](cn|ch[si]|sg|zho?|eng)" \ r"|简[体中]?)[.\])])" \ r"|([\u4e00-\u9fa5]{0,3}[中双][\u4e00-\u9fa5]{0,2}[字文语][\u4e00-\u9fa5]{0,3})" \ r"|简体|简中|JPSC" \ r"|(? 通过new_sub_tag_list 获取新的tag附加到字幕文件名, 继续检查是否能转移 except OSError as reason: logger.info(f"字幕 {new_file} 出错了,原因: {reason}") return 0 def __transfer_audio_track_files(self, org_path: Path, new_path: Path, transfer_type: str, over_flag: bool) -> int: """ 根据文件名转移对应音轨文件 :param org_path: 原文件名 :param new_path: 新文件名 :param transfer_type: RmtMode转移方式 :param over_flag: 是否覆盖,为True时会先删除再转移 """ dir_name = org_path.parent file_name = org_path.name file_list: List[Path] = SystemUtils.list_files(dir_name, ['.mka']) pending_file_list: List[Path] = [file for file in file_list if org_path.stem == file.stem] if len(pending_file_list) == 0: logger.debug(f"{dir_name} 目录下没有找到匹配的音轨文件") else: logger.debug("音轨文件清单:" + str(pending_file_list)) for track_file in pending_file_list: track_ext = track_file.suffix new_track_file = new_path.with_name(new_path.stem + track_ext) if new_track_file.exists(): if not over_flag: logger.warn(f"音轨文件已存在:{new_track_file}") continue else: logger.info(f"正在删除已存在的音轨文件:{new_track_file}") new_track_file.unlink() try: logger.info(f"正在转移音轨文件:{track_file} 到 {new_track_file}") retcode = self.__transfer_command(file_item=track_file, target_file=new_track_file, transfer_type=transfer_type) if retcode == 0: logger.info(f"音轨文件 {file_name} {transfer_type}完成") else: logger.error(f"音轨文件 {file_name} {transfer_type}失败,错误码:{retcode}") except OSError as reason: logger.error(f"音轨文件 {file_name} {transfer_type}失败:{reason}") return 0 def __transfer_dir(self, file_path: Path, new_path: Path, transfer_type: str) -> int: """ 转移整个文件夹 :param file_path: 原路径 :param new_path: 新路径 :param transfer_type: RmtMode转移方式 """ logger.info(f"正在{transfer_type}目录:{file_path} 到 {new_path}") # 复制 retcode = self.__transfer_dir_files(src_dir=file_path, target_dir=new_path, transfer_type=transfer_type) if retcode == 0: logger.info(f"文件 {file_path} {transfer_type}完成") else: logger.error(f"文件{file_path} {transfer_type}失败,错误码:{retcode}") return retcode def __transfer_dir_files(self, src_dir: Path, target_dir: Path, transfer_type: str) -> int: """ 按目录结构转移目录下所有文件 :param src_dir: 原路径 :param target_dir: 新路径 :param transfer_type: RmtMode转移方式 """ retcode = 0 for file in src_dir.glob("**/*"): # 过滤掉目录 if file.is_dir(): continue # 使用target_dir的父目录作为新的父目录 new_file = target_dir.joinpath(file.relative_to(src_dir)) if new_file.exists(): logger.warn(f"{new_file} 文件已存在") continue if not new_file.parent.exists(): new_file.parent.mkdir(parents=True, exist_ok=True) retcode = self.__transfer_command(file_item=file, target_file=new_file, transfer_type=transfer_type) if retcode != 0: break return retcode def __transfer_file(self, file_item: Path, new_file: Path, transfer_type: str, over_flag: bool = False) -> int: """ 转移一个文件,同时处理其他相关文件 :param file_item: 原文件路径 :param new_file: 新文件路径 :param transfer_type: RmtMode转移方式 :param over_flag: 是否覆盖,为True时会先删除再转移 """ if new_file.exists(): if not over_flag: logger.warn(f"文件已存在:{new_file}") return 0 else: logger.info(f"正在删除已存在的文件:{new_file}") new_file.unlink() logger.info(f"正在转移文件:{file_item} 到 {new_file}") # 创建父目录 new_file.parent.mkdir(parents=True, exist_ok=True) retcode = self.__transfer_command(file_item=file_item, target_file=new_file, transfer_type=transfer_type) if retcode == 0: logger.info(f"文件 {file_item} {transfer_type}完成") else: logger.error(f"文件 {file_item} {transfer_type}失败,错误码:{retcode}") return retcode # 处理其他相关文件 return self.__transfer_other_files(org_path=file_item, new_path=new_file, transfer_type=transfer_type, over_flag=over_flag) @staticmethod def __get_dest_dir(mediainfo: MediaInfo, target_dir: Path) -> Path: """ 根据设置并装媒体库目录 :param mediainfo: 媒体信息 :target_dir: 媒体库根目录 """ if mediainfo.type == MediaType.MOVIE: # 电影 if settings.LIBRARY_MOVIE_NAME: target_dir = target_dir / settings.LIBRARY_MOVIE_NAME / mediainfo.category else: # 目的目录加上类型和二级分类 target_dir = target_dir / mediainfo.type.value / mediainfo.category if mediainfo.type == MediaType.TV: # 电视剧 if settings.LIBRARY_ANIME_NAME \ and mediainfo.genre_ids \ and set(mediainfo.genre_ids).intersection(set(settings.ANIME_GENREIDS)): # 动漫 target_dir = target_dir / settings.LIBRARY_ANIME_NAME / mediainfo.category elif settings.LIBRARY_TV_NAME: # 电视剧 target_dir = target_dir / settings.LIBRARY_TV_NAME / mediainfo.category else: # 目的目录加上类型和二级分类 target_dir = target_dir / mediainfo.type.value / mediainfo.category return target_dir def transfer_media(self, in_path: Path, in_meta: MetaBase, mediainfo: MediaInfo, transfer_type: str, target_dir: Path, episodes_info: List[TmdbEpisode] = None ) -> TransferInfo: """ 识别并转移一个文件或者一个目录下的所有文件 :param in_path: 转移的路径,可能是一个文件也可以是一个目录 :param in_meta:预识别元数据 :param mediainfo: 媒体信息 :param target_dir: 媒体库根目录 :param transfer_type: 文件转移方式 :param episodes_info: 当前季的全部集信息 :return: TransferInfo、错误信息 """ # 检查目录路径 if not in_path.exists(): return TransferInfo(success=False, path=in_path, message=f"{in_path} 路径不存在") if transfer_type not in ['rclone_copy', 'rclone_move']: # 检查目标路径 if not target_dir.exists(): return TransferInfo(success=False, path=in_path, message=f"{target_dir} 目标路径不存在") # 媒体库目的目录 target_dir = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target_dir) # 重命名格式 rename_format = settings.TV_RENAME_FORMAT \ if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT # 判断是否为文件夹 if in_path.is_dir(): # 转移整个目录 # 是否蓝光原盘 bluray_flag = SystemUtils.is_bluray_dir(in_path) if bluray_flag: logger.info(f"{in_path} 是蓝光原盘文件夹") # 原文件大小 file_size = in_path.stat().st_size # 目的路径 new_path = self.get_rename_path( path=target_dir, template_string=rename_format, rename_dict=self.__get_naming_dict(meta=in_meta, mediainfo=mediainfo) ).parent # 转移蓝光原盘 retcode = self.__transfer_dir(file_path=in_path, new_path=new_path, transfer_type=transfer_type) if retcode != 0: logger.error(f"文件夹 {in_path} 转移失败,错误码:{retcode}") return TransferInfo(success=False, message=f"错误码:{retcode}", path=in_path, target_path=new_path, is_bluray=bluray_flag) logger.info(f"文件夹 {in_path} 转移成功") # 返回转移后的路径 return TransferInfo(success=True, path=in_path, target_path=new_path, total_size=file_size, is_bluray=bluray_flag) else: # 转移单个文件 if mediainfo.type == MediaType.TV: # 电视剧 if in_meta.begin_episode is None: logger.warn(f"文件 {in_path} 转移失败:未识别到文件集数") return TransferInfo(success=False, message=f"未识别到文件集数", path=in_path, fail_list=[str(in_path)]) # 文件结束季为空 in_meta.end_season = None # 文件总季数为1 if in_meta.total_season: in_meta.total_season = 1 # 文件不可能超过2集 if in_meta.total_episode > 2: in_meta.total_episode = 1 in_meta.end_episode = None # 目的文件名 new_file = self.get_rename_path( path=target_dir, template_string=rename_format, rename_dict=self.__get_naming_dict( meta=in_meta, mediainfo=mediainfo, episodes_info=episodes_info, file_ext=in_path.suffix ) ) # 判断是否要覆盖 overflag = False if new_file.exists(): if new_file.stat().st_size < in_path.stat().st_size: logger.info(f"目标文件已存在,但文件大小更小,将覆盖:{new_file}") overflag = True # 原文件大小 file_size = in_path.stat().st_size # 转移文件 retcode = self.__transfer_file(file_item=in_path, new_file=new_file, transfer_type=transfer_type, over_flag=overflag) if retcode != 0: logger.error(f"文件 {in_path} 转移失败,错误码:{retcode}") return TransferInfo(success=False, message=f"错误码:{retcode}", path=in_path, target_path=new_file, fail_list=[str(in_path)]) logger.info(f"文件 {in_path} 转移成功") return TransferInfo(success=True, path=in_path, target_path=new_file, file_count=1, total_size=file_size, is_bluray=False, file_list=[str(in_path)], file_list_new=[str(new_file)]) @staticmethod def __get_naming_dict(meta: MetaBase, mediainfo: MediaInfo, file_ext: str = None, episodes_info: List[TmdbEpisode] = None) -> dict: """ 根据媒体信息,返回Format字典 :param meta: 文件元数据 :param mediainfo: 识别的媒体信息 :param file_ext: 文件扩展名 :param episodes_info: 当前季的全部集信息 """ # 获取集标题 episode_title = None if meta.begin_episode and episodes_info: for episode in episodes_info: if episode.episode_number == meta.begin_episode: episode_title = episode.name break return { # 标题 "title": mediainfo.title, # 原文件名 "original_name": f"{meta.org_string}{file_ext}", # 原语种标题 "original_title": mediainfo.original_title, # 识别名称 "name": meta.name, # 年份 "year": mediainfo.year or meta.year, # 资源类型 "resourceType": meta.resource_type, # 特效 "effect": meta.resource_effect, # 版本 "edition": meta.edition, # 分辨率 "videoFormat": meta.resource_pix, # 制作组/字幕组 "releaseGroup": meta.resource_team, # 视频编码 "videoCodec": meta.video_encode, # 音频编码 "audioCodec": meta.audio_encode, # TMDBID "tmdbid": mediainfo.tmdb_id, # IMDBID "imdbid": mediainfo.imdb_id, # 季号 "season": meta.season_seq, # 集号 "episode": meta.episode_seqs, # 季集 SxxExx "season_episode": "%s%s" % (meta.season, meta.episodes), # 段/节 "part": meta.part, # 剧集标题 "episode_title": episode_title, # 文件后缀 "fileExt": file_ext, # 自定义占位符 "customization": meta.customization } @staticmethod def get_rename_path(template_string: str, rename_dict: dict, path: Path = None) -> Path: """ 生成重命名后的完整路径 """ # 创建jinja2模板对象 template = Template(template_string) # 渲染生成的字符串 render_str = template.render(rename_dict) # 目的路径 if path: return path / render_str else: return Path(render_str) @staticmethod def get_library_path(path: Path): """ 根据文件路径查询其所在的媒体库目录,查询不到的返回输入目录 """ if not path: return None if not settings.LIBRARY_PATHS: return path # 目的路径,多路径以,分隔 dest_paths = settings.LIBRARY_PATHS for libpath in dest_paths: try: if path.is_relative_to(libpath): return libpath except Exception as e: logger.debug(f"计算媒体库路径时出错:{e}") continue return path @staticmethod def get_target_path(in_path: Path = None) -> Optional[Path]: """ 计算一个最好的目的目录,有in_path时找与in_path同路径的,没有in_path时,顺序查找1个符合大小要求的,没有in_path和size时,返回第1个 :param in_path: 源目录 """ if not settings.LIBRARY_PATHS: return None # 目的路径,多路径以,分隔 dest_paths = settings.LIBRARY_PATHS # 只有一个路径,直接返回 if len(dest_paths) == 1: return dest_paths[0] # 匹配有最长共同上级路径的目录 max_length = 0 target_path = None if in_path: for path in dest_paths: try: relative = in_path.relative_to(path).as_posix() if len(relative) > max_length: max_length = len(relative) target_path = path except Exception as e: logger.debug(f"计算目标路径时出错:{e}") continue if target_path: return target_path # 顺序匹配第1个满足空间存储要求的目录 if in_path.exists(): file_size = in_path.stat().st_size for path in dest_paths: if SystemUtils.free_space(path) > file_size: return path # 默认返回第1个 return dest_paths[0] def media_exists(self, mediainfo: MediaInfo, itemid: str = None) -> Optional[ExistMediaInfo]: """ 判断媒体文件是否存在于本地文件系统 :param mediainfo: 识别的媒体信息 :param itemid: 媒体服务器ItemID :return: 如不存在返回None,存在时返回信息,包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}} """ if not settings.LIBRARY_PATHS: return None # 目的路径 dest_paths = settings.LIBRARY_PATHS # 检查每一个媒体库目录 for dest_path in dest_paths: # 媒体库路径 target_dir = self.get_target_path(dest_path) if not target_dir: continue # 媒体分类路径 target_dir = self.__get_dest_dir(mediainfo=mediainfo, target_dir=target_dir) # 重命名格式 rename_format = settings.TV_RENAME_FORMAT \ if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT # 相对路径 meta = MetaInfo(mediainfo.title) rel_path = self.get_rename_path( template_string=rename_format, rename_dict=self.__get_naming_dict(meta=meta, mediainfo=mediainfo) ) # 取相对路径的第1层目录 if rel_path.parts: media_path = target_dir / rel_path.parts[0] else: continue # 检查媒体文件夹是否存在 if not media_path.exists(): continue # 检索媒体文件 media_files = SystemUtils.list_files(directory=media_path, extensions=settings.RMT_MEDIAEXT) if not media_files: continue if mediainfo.type == MediaType.MOVIE: # 电影存在任何文件为存在 logger.info(f"文件系统已存在:{mediainfo.title_year}") return ExistMediaInfo(type=MediaType.MOVIE) else: # 电视剧检索集数 seasons: Dict[int, list] = {} for media_file in media_files: file_meta = MetaInfo(media_file.stem) season_index = file_meta.begin_season or 1 episode_index = file_meta.begin_episode if not episode_index: continue if season_index not in seasons: seasons[season_index] = [] seasons[season_index].append(episode_index) # 返回剧集情况 logger.info(f"{mediainfo.title_year} 文件系统已存在:{seasons}") return ExistMediaInfo(type=MediaType.TV, seasons=seasons) # 不存在 return None