diff --git a/app/chain/transfer.py b/app/chain/transfer.py new file mode 100644 index 00000000..072ab933 --- /dev/null +++ b/app/chain/transfer.py @@ -0,0 +1,16 @@ +from app.chain import _ChainBase + + +class TransferChain(_ChainBase): + """ + 文件转移处理链 + """ + + def process(self) -> bool: + """ + 根据媒体信息,执行搜索 + """ + # 从下载器获取种子列表 + # 识别 + # 转移 + pass diff --git a/app/core/config.py b/app/core/config.py index 63592221..38e6e4f6 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -127,6 +127,10 @@ class Settings(BaseSettings): LIBRARY_CATEGORY: bool = True # 豆瓣用户ID,用于同步豆瓣数据,使用,分隔 DOUBAN_USER_IDS: str = "" + # 电影重命名格式 + MOVIE_RENAME_FORMAT: str = "{title} ({year})/{title} ({year})-{part} - {videoFormat}" + # 电视剧重命名格式 + TV_RENAME_FORMAT: str = "{title} ({year})/Season {season}/{title} - {season_episode}-{part} - 第 {episode} 集" @property def INNER_CONFIG_PATH(self): diff --git a/app/modules/__init__.py b/app/modules/__init__.py index bfd1aead..b468152d 100644 --- a/app/modules/__init__.py +++ b/app/modules/__init__.py @@ -130,9 +130,25 @@ class _ModuleBase(metaclass=ABCMeta): """ pass + def list_torrents(self, status: Union[str, list]) -> List[dict]: + """ + 获取下载器种子列表 + :param status: 种子状态 + :return: 下载器中符合状态的种子列表 + """ + pass + + def remove_torrents(self, status: Union[str, list]) -> bool: + """ + 删除下载器种子 + :param status: 种子状态 + :return: bool + """ + pass + def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[bool]: """ - 文件转移 + 转移一个路径下的文件 :param path: 文件路径 :param mediainfo: 识别的媒体信息 :return: 成功或失败 diff --git a/app/modules/filetransfer/__init__.py b/app/modules/filetransfer/__init__.py index 11c912da..97b87e22 100644 --- a/app/modules/filetransfer/__init__.py +++ b/app/modules/filetransfer/__init__.py @@ -1,7 +1,15 @@ -from typing import Optional, Tuple, Union +import re +from pathlib import Path +from threading import Lock +from typing import Optional, List, Tuple, Union -from app.core import MediaInfo +from app.core import MediaInfo, MetaInfo, settings +from app.core.meta import MetaBase +from app.log import logger from app.modules import _ModuleBase +from app.utils.system import SystemUtils + +lock = Lock() class FileTransferModule(_ModuleBase): @@ -10,7 +18,7 @@ class FileTransferModule(_ModuleBase): pass def init_setting(self) -> Tuple[str, Union[str, bool]]: - return "TRANSFER_TYPE", True + pass def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[bool]: """ @@ -20,3 +28,292 @@ class FileTransferModule(_ModuleBase): :return: 成功或失败 """ pass + + @staticmethod + def __transfer_command(file_item: Path, target_file: Path, rmt_mode) -> int: + """ + 使用系统命令处理单个文件 + :param file_item: 文件路径 + :param target_file: 目标文件路径 + :param rmt_mode: RmtMode转移方式 + """ + with lock: + if rmt_mode == 'link': + # 硬链接 + retcode, retmsg = SystemUtils.link(file_item, target_file) + elif rmt_mode == 'softlink': + # 软链接 + retcode, retmsg = SystemUtils.softlink(file_item, target_file) + elif rmt_mode == 'move': + # 移动 + retcode, retmsg = SystemUtils.move(file_item, target_file) + else: + # 复制 + retcode, retmsg = SystemUtils.copy(file_item, target_file) + + if retcode != 0: + logger.error(retmsg) + + return retcode + + def __transfer_other_files(self, org_path: Path, new_path: Path, rmt_mode: str, over_flag: bool) -> int: + """ + 根据文件名转移其他相关文件 + :param org_path: 原文件名 + :param new_path: 新文件名 + :param rmt_mode: RmtMode转移方式 + :param over_flag: 是否覆盖,为True时会先删除再转移 + """ + retcode = self.__transfer_subtitles(org_path, new_path, rmt_mode) + if retcode != 0: + return retcode + retcode = self.__transfer_audio_track_files(org_path, new_path, rmt_mode, over_flag) + if retcode != 0: + return retcode + return 0 + + def __transfer_subtitles(self, org_path: Path, new_path: Path, rmt_mode: str) -> int: + """ + 根据文件名转移对应字幕文件 + :param org_path: 原文件名 + :param new_path: 新文件名 + :param rmt_mode: RmtMode转移方式 + """ + # 字幕正则式 + _zhcn_sub_re = r"([.\[(](((zh[-_])?(cn|ch[si]|sg|sc))|zho?" \ + r"|chinese|(cn|ch[si]|sg|zho?|eng)[-_&](cn|ch[si]|sg|zho?|eng)" \ + r"|简[体中]?)[.\])])" \ + r"|([\u4e00-\u9fa5]{0,3}[中双][\u4e00-\u9fa5]{0,2}[字文语][\u4e00-\u9fa5]{0,3})" \ + r"|简体|简中" \ + r"|(? 通过new_sub_tag_list 获取新的tag附加到字幕文件名, 继续检查是否能转移 + except OSError as reason: + logger.info(f"字幕 {new_file} 出错了,原因: {reason}") + return 0 + + def __transfer_audio_track_files(self, org_path: Path, new_path: Path, rmt_mode: str, over_flag: bool) -> int: + """ + 根据文件名转移对应音轨文件 + :param org_path: 原文件名 + :param new_path: 新文件名 + :param rmt_mode: RmtMode转移方式 + :param over_flag: 是否覆盖,为True时会先删除再转移 + """ + dir_name = org_path.parent + file_name = org_path.name + file_list: List[Path] = SystemUtils.list_files_with_extensions(dir_name, ['.mka']) + pending_file_list: List[Path] = [file for file in file_list if org_path.stem == file.stem] + if len(pending_file_list) == 0: + logger.debug(f"{dir_name} 目录下没有找到匹配的音轨文件") + else: + logger.debug("音轨文件清单:" + str(pending_file_list)) + for track_file in pending_file_list: + track_ext = track_file.suffix + new_track_file = new_path.with_name(new_path.stem + track_ext) + if new_track_file.exists(): + if not over_flag: + logger.warn(f"音轨文件已存在:{new_track_file}") + continue + else: + logger.info(f"正在删除已存在的音轨文件:{new_track_file}") + new_track_file.unlink() + try: + logger.info(f"正在转移音轨文件:{track_file} 到 {new_track_file}") + retcode = self.__transfer_command(file_item=track_file, + target_file=new_track_file, + rmt_mode=rmt_mode) + if retcode == 0: + logger.info(f"音轨文件 {file_name} {rmt_mode}完成") + else: + logger.error(f"音轨文件 {file_name} {rmt_mode}失败,错误码:{retcode}") + except OSError as reason: + logger.error(f"音轨文件 {file_name} {rmt_mode}失败:{reason}") + return 0 + + def __transfer_bluray_dir(self, file_path: Path, new_path: Path, rmt_mode: str) -> int: + """ + 转移蓝光文件夹 + :param file_path: 原路径 + :param new_path: 新路径 + :param rmt_mode: RmtMode转移方式 + """ + logger.info(f"正在{rmt_mode}目录:{file_path} 到 {new_path}") + # 复制 + retcode = self.__transfer_dir_files(src_dir=file_path, + target_dir=new_path, + rmt_mode=rmt_mode) + if retcode == 0: + logger.info(f"文件 {file_path} {rmt_mode}完成") + else: + logger.error(f"文件{file_path} {rmt_mode}失败,错误码:{retcode}") + + return retcode + + def __transfer_dir_files(self, src_dir: Path, target_dir: Path, rmt_mode: str) -> int: + """ + 按目录结构转移所有文件 + :param src_dir: 原路径 + :param target_dir: 新路径 + :param rmt_mode: RmtMode转移方式 + """ + retcode = 0 + for file in src_dir.glob("**/*"): + new_file = target_dir.with_name(src_dir.name) + if new_file.exists(): + logger.warn(f"{new_file} 文件已存在") + continue + if not new_file.parent.exists(): + new_file.parent.mkdir(parents=True) + retcode = self.__transfer_command(file_item=file, + target_file=new_file, + rmt_mode=rmt_mode) + if retcode != 0: + break + + return retcode + + def __transfer_file(self, file_item: Path, new_file: Path, rmt_mode: str, + over_flag: bool = False, old_file: Path = None) -> int: + """ + 转移一个文件,同时处理其他相关文件 + :param file_item: 原文件路径 + :param new_file: 新文件路径 + :param rmt_mode: RmtMode转移方式 + :param over_flag: 是否覆盖,为True时会先删除再转移 + """ + if not over_flag and new_file.exists(): + logger.warn(f"文件已存在:{new_file}") + return 0 + if over_flag and old_file and old_file.exists(): + logger.info(f"正在删除已存在的文件:{old_file}") + old_file.unlink() + logger.info(f"正在转移文件:{file_item.name} 到 {new_file}") + retcode = self.__transfer_command(file_item=file_item, + target_file=new_file, + rmt_mode=rmt_mode) + if retcode == 0: + logger.info(f"文件 {file_item.name} {rmt_mode}完成") + else: + logger.error(f"文件 {file_item.name} {rmt_mode}失败,错误码:{retcode}") + return retcode + # 处理其他相关文件 + return self.__transfer_other_files(org_path=file_item, + new_path=new_file, + rmt_mode=rmt_mode, + over_flag=over_flag) + + def transfer_media(self, + in_path: Path, + meidainfo: MediaInfo, + rmt_mode: str = None, + target_dir: Path = None + ) -> Tuple[bool, str]: + """ + 识别并转移一个文件、多个文件或者目录 + :param in_path: 转移的路径,可能是一个文件也可以是一个目录 + :param target_dir: 目的文件夹,非空的转移到该文件夹,为空时则按类型转移到配置文件中的媒体库文件夹 + :param rmt_mode: 文件转移方式 + :param meidainfo: 媒体信息 + :return: 处理状态,错误信息 + """ + pass + + def __get_naming_dict(self, meta: MetaBase, mediainfo: MediaInfo) -> dict: + """ + 根据媒体信息,返回Format字典 + :param meta: 文件元数据 + :param mediainfo: 识别的媒体信息 + """ + pass + + def get_movie_dest_path(self, meta: MetaBase, mediainfo: MediaInfo) -> Tuple[str, str]: + """ + 计算电影文件路径 + :return: 电影目录、电影名称 + """ + pass + + def get_tv_dest_path(self, meta: MetaBase, mediainfo: MediaInfo) -> Tuple[str, str, str]: + """ + 计算电视剧文件路径 + :return: 电视剧目录、季目录、集名称 + """ + pass