import time from pathlib import Path from typing import Optional, List, Tuple, Union from xml.dom import minidom from app.core.config import settings from app.core.context import MediaInfo from app.core.meta_info import MetaInfo from app.core.meta import MetaBase from app.log import logger from app.modules import _ModuleBase from app.modules.themoviedb.category import CategoryHelper from app.modules.themoviedb.tmdb import TmdbHelper from app.modules.themoviedb.tmdb_cache import TmdbCache from app.utils.dom import DomUtils from app.utils.http import RequestUtils from app.utils.system import SystemUtils from app.utils.types import MediaType class TheMovieDb(_ModuleBase): """ TMDB媒体信息匹配 """ # 元数据缓存 cache: TmdbCache = None # TMDB tmdb: TmdbHelper = None # 二级分类 category: CategoryHelper = None def init_module(self) -> None: self.cache = TmdbCache() self.tmdb = TmdbHelper() self.category = CategoryHelper() def stop(self): self.cache.save() def init_setting(self) -> Tuple[str, Union[str, bool]]: pass def recognize_media(self, meta: MetaBase, tmdbid: str = None) -> Optional[MediaInfo]: """ 识别媒体信息 :param meta: 识别的元数据 :param tmdbid: tmdbid :return: 识别的媒体信息,包括剧集信息 """ if not meta: return None cache_info = self.cache.get(meta) if not cache_info: # 缓存没有或者强制不使用缓存 if tmdbid: # 直接查询详情 info = self.tmdb.get_info(mtype=meta.type, tmdbid=tmdbid) else: if meta.type != MediaType.TV and not meta.year: info = self.tmdb.search_multi(meta.get_name()) else: if meta.type == MediaType.TV: # 确定是电视 info = self.tmdb.match(name=meta.get_name(), year=meta.year, mtype=meta.type, season_year=meta.year, season_number=meta.begin_season) if meta.year: # 非严格模式下去掉年份再查一次 info = self.tmdb.match(name=meta.get_name(), mtype=meta.type) else: # 有年份先按电影查 info = self.tmdb.match(name=meta.get_name(), year=meta.year, mtype=MediaType.MOVIE) # 没有再按电视剧查 if not info: info = self.tmdb.match(name=meta.get_name(), year=meta.year, mtype=MediaType.TV) if not info: # 非严格模式下去掉年份和类型再查一次 info = self.tmdb.search_multi(name=meta.get_name()) if not info: # 从网站查询 info = self.tmdb.search_web(name=meta.get_name(), mtype=meta.type) # 补充全量信息 if info and not info.get("genres"): info = self.tmdb.get_info(mtype=info.get("media_type"), tmdbid=info.get("id")) # 保存到缓存 self.cache.update(meta, info) else: # 使用缓存信息 if cache_info.get("title"): logger.info(f"使用识别缓存:{cache_info.get('title')}") info = self.tmdb.get_info(mtype=cache_info.get("type"), tmdbid=cache_info.get("id")) else: info = None if info: # 确定二级分类 if info.get('media_type') == MediaType.MOVIE: cat = self.category.get_movie_category(info) else: cat = self.category.get_tv_category(info) # 赋值TMDB信息并返回 mediainfo = MediaInfo(tmdb_info=info) mediainfo.set_category(cat) return mediainfo return None def search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]: """ 搜索媒体信息 :param meta: 识别的元数据 :reutrn: 媒体信息 """ # 未启用时返回None if settings.SEARCH_SOURCE != "themoviedb": return None if not meta.get_name(): return [] if not meta.type and not meta.year: results = self.tmdb.search_multiis(meta.get_name()) else: if not meta.type: results = list( set(self.tmdb.search_movies(meta.get_name(), meta.year)) .union(set(self.tmdb.search_tv_tmdbinfos(meta.get_name(), meta.year))) ) # 组合结果的情况下要排序 results = sorted( results, key=lambda x: x.get("release_date") or x.get("first_air_date") or "0000-00-00", reverse=True ) elif meta.type == MediaType.MOVIE: results = self.tmdb.search_movies(meta.get_name(), meta.year) else: results = self.tmdb.search_tv_tmdbinfos(meta.get_name(), meta.year) return [MediaInfo(tmdb_info=info) for info in results] def scrape_metadata(self, path: Path, mediainfo: MediaInfo, force_nfo: bool = False, force_pic: bool = False) -> None: """ 刮削元数据 :param path: 媒体文件路径 :param mediainfo: 识别的媒体信息 :param force_nfo: 强制刮削nfo :param force_pic: 强制刮削图片 :return: 成功或失败 """ if settings.SCRAP_SOURCE != "themoviedb": return None # 目录下的所有文件 for file in SystemUtils.list_files_with_extensions(path, settings.RMT_MEDIAEXT): if not file: continue logger.info(f"开始刮削媒体库文件:{file} ...") self.gen_scraper_files(mediainfo=mediainfo, file_path=file) logger.info(f"{file} 刮削完成") def gen_scraper_files(self, mediainfo: MediaInfo, file_path: Path): """ 生成刮削文件 :param mediainfo: 媒体信息 :param file_path: 文件路径 """ def __get_episode_detail(_seasoninfo: dict, _episode: int): """ 根据季信息获取集的信息 """ for _episode_info in _seasoninfo.get("episodes") or []: if _episode_info.get("episode_number") == _episode: return _episode_info return {} try: # 电影 if mediainfo.type == MediaType.MOVIE: # 强制或者不已存在时才处理 if not file_path.with_name("movie.nfo").exists() \ and not file_path.with_suffix(".nfo").exists(): # 生成电影描述文件 self.__gen_movie_nfo_file(mediainfo=mediainfo, file_path=file_path) # 生成电影图片 for attr_name, attr_value in vars(mediainfo).items(): if attr_value \ and attr_name.endswith("_path") \ and attr_value.startswith("http"): image_name = attr_name.replace("_path", "") + Path(attr_value).suffix self.__save_image(url=attr_value, file_path=file_path.with_name(image_name)) # 电视剧 else: # 识别 meta = MetaInfo(file_path.stem) # 不存在时才处理 if not file_path.parent.with_name("tvshow.nfo").exists(): # 根目录描述文件 self.__gen_tv_nfo_file(mediainfo=mediainfo, dir_path=file_path.parents[1]) # 生成根目录图片 for attr_name, attr_value in vars(mediainfo).items(): if attr_value \ and attr_name.endswith("_path") \ and not attr_name.startswith("season") \ and attr_value.startswith("http"): image_name = attr_name.replace("_path", "") + Path(attr_value).suffix self.__save_image(url=attr_value, file_path=file_path.parent.with_name(image_name)) # 查询季信息 seasoninfo = self.tmdb.get_tv_season_detail(mediainfo.tmdb_id, meta.begin_season) if seasoninfo: # 季目录NFO if not file_path.with_name("season.nfo").exists(): self.__gen_tv_season_nfo_file(seasoninfo=seasoninfo, season=meta.begin_season, season_path=file_path.parent) # 季的图片 for attr_name, attr_value in vars(mediainfo).items(): if attr_value \ and attr_name.startswith("season") \ and attr_value.startswith("http"): image_name = attr_name.replace("_path", "").replace("season", f"{str(meta.begin_season).rjust(2, '0')}-")\ + Path(attr_value).suffix self.__save_image(url=attr_value, file_path=file_path.parent.with_name(image_name)) # 查询集详情 episodeinfo = __get_episode_detail(seasoninfo, meta.begin_episode) if episodeinfo: # 集NFO if not file_path.with_suffix(".nfo").exists(): self.__gen_tv_episode_nfo_file(episodeinfo=episodeinfo, season=meta.begin_season, episode=meta.begin_episode, file_path=file_path) # 集的图片 if episodeinfo.get('still_path'): self.__save_image(f"https://{settings.TMDB_IMAGE_DOMAIN}/t/p/original{episodeinfo.get('still_path')}", file_path.with_suffix(Path(episodeinfo.get('still_path')).suffix)) except Exception as e: logger.error(f"{file_path} 刮削失败:{e}") @staticmethod def __gen_common_nfo(mediainfo: MediaInfo, doc, root): """ 生成公共NFO """ # TMDBINFO tmdbinfo = mediainfo.tmdb_info # 添加时间 DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) # TMDB DomUtils.add_node(doc, root, "tmdbid", mediainfo.tmdb_id or "") uniqueid_tmdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.tmdb_id or "") uniqueid_tmdb.setAttribute("type", "tmdb") uniqueid_tmdb.setAttribute("default", "true") # TVDB if mediainfo.tvdb_id: DomUtils.add_node(doc, root, "tvdbid", mediainfo.tvdb_id) uniqueid_tvdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.tvdb_id) uniqueid_tvdb.setAttribute("type", "tvdb") # IMDB if mediainfo.imdb_id: DomUtils.add_node(doc, root, "imdbid", mediainfo.imdb_id) uniqueid_imdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.imdb_id) uniqueid_imdb.setAttribute("type", "imdb") uniqueid_imdb.setAttribute("default", "true") uniqueid_tmdb.setAttribute("default", "false") # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(mediainfo.overview or "")) xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(mediainfo.overview or "")) # 导演 for director in mediainfo.directors: xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "") xdirector.setAttribute("tmdbid", str(director.get("id") or "")) # 演员 for actor in mediainfo.actors: xactor = DomUtils.add_node(doc, root, "actor") DomUtils.add_node(doc, xactor, "name", actor.get("name") or "") DomUtils.add_node(doc, xactor, "type", "Actor") DomUtils.add_node(doc, xactor, "role", actor.get("character") or actor.get("role") or "") DomUtils.add_node(doc, xactor, "order", actor.get("order") if actor.get("order") is not None else "") DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "") DomUtils.add_node(doc, xactor, "thumb", actor.get('image')) DomUtils.add_node(doc, xactor, "profile", actor.get('profile')) # 风格 genres = tmdbinfo.get("genres") or [] for genre in genres: DomUtils.add_node(doc, root, "genre", genre.get("name") or "") # 评分 DomUtils.add_node(doc, root, "rating", mediainfo.vote_average or "0") # 评级 if tmdbinfo.get("releases") and tmdbinfo.get("releases").get("countries"): releases = [i for i in tmdbinfo.get("releases").get("countries") if i.get("certification") and i.get("certification").strip()] # 国内没有分级,所以沿用美国的分级 us_release = next((c for c in releases if c.get("iso_3166_1") == "US"), None) if us_release: DomUtils.add_node(doc, root, "mpaa", us_release.get("certification") or "") return doc def __gen_movie_nfo_file(self, mediainfo: MediaInfo, file_path: Path): """ 生成电影的NFO描述文件 :param mediainfo: 识别后的媒体信息 :param file_path: 电影文件路径 """ # 开始生成XML logger.info(f"正在生成电影NFO文件:{file_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "movie") # 公共部分 doc = self.__gen_common_nfo(mediainfo=mediainfo, doc=doc, root=root) # 标题 DomUtils.add_node(doc, root, "title", mediainfo.title or "") DomUtils.add_node(doc, root, "originaltitle", mediainfo.original_title or "") # 发布日期 DomUtils.add_node(doc, root, "premiered", mediainfo.release_date or "") # 年份 DomUtils.add_node(doc, root, "year", mediainfo.year or "") # 保存 self.__save_nfo(doc, file_path.with_suffix(".nfo")) def __gen_tv_nfo_file(self, mediainfo: MediaInfo, dir_path: Path): """ 生成电视剧的NFO描述文件 :param mediainfo: 媒体信息 :param dir_path: 电视剧根目录 """ # 开始生成XML logger.info(f"正在生成电视剧NFO文件:{dir_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "tvshow") # 公共部分 doc = self.__gen_common_nfo(mediainfo=mediainfo, doc=doc, root=root) # 标题 DomUtils.add_node(doc, root, "title", mediainfo.title or "") DomUtils.add_node(doc, root, "originaltitle", mediainfo.original_title or "") # 发布日期 DomUtils.add_node(doc, root, "premiered", mediainfo.release_date or "") # 年份 DomUtils.add_node(doc, root, "year", mediainfo.year or "") DomUtils.add_node(doc, root, "season", "-1") DomUtils.add_node(doc, root, "episode", "-1") # 保存 self.__save_nfo(doc, dir_path.joinpath("tvshow.nfo")) def __gen_tv_season_nfo_file(self, seasoninfo: dict, season: int, season_path: Path): """ 生成电视剧季的NFO描述文件 :param seasoninfo: TMDB季媒体信息 :param season: 季号 :param season_path: 电视剧季的目录 """ logger.info(f"正在生成季NFO文件:{season_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "season") # 添加时间 DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(seasoninfo.get("overview") or "")) xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(seasoninfo.get("overview") or "")) # 标题 DomUtils.add_node(doc, root, "title", "季 %s" % season) # 发行日期 DomUtils.add_node(doc, root, "premiered", seasoninfo.get("air_date") or "") DomUtils.add_node(doc, root, "releasedate", seasoninfo.get("air_date") or "") # 发行年份 DomUtils.add_node(doc, root, "year", seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "") # seasonnumber DomUtils.add_node(doc, root, "seasonnumber", str(season)) # 保存 self.__save_nfo(doc, season_path.joinpath("season.nfo")) def __gen_tv_episode_nfo_file(self, episodeinfo: dict, season: int, episode: int, file_path: Path): """ 生成电视剧集的NFO描述文件 :param episodeinfo: 集TMDB元数据 :param season: 季号 :param episode: 集号 :param file_path: 集文件的路径 """ # 开始生成集的信息 logger.info(f"正在生成剧集NFO文件:{file_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "episodedetails") # 添加时间 DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) # TMDBID uniqueid = DomUtils.add_node(doc, root, "uniqueid", episodeinfo.get("id") or "") uniqueid.setAttribute("type", "tmdb") uniqueid.setAttribute("default", "true") # tmdbid DomUtils.add_node(doc, root, "tmdbid", episodeinfo.get("id") or "") # 标题 DomUtils.add_node(doc, root, "title", episodeinfo.get("name") or "第 %s 集" % episode) # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(episodeinfo.get("overview") or "")) xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(episodeinfo.get("overview") or "")) # 发布日期 DomUtils.add_node(doc, root, "aired", episodeinfo.get("air_date") or "") # 年份 DomUtils.add_node(doc, root, "year", episodeinfo.get("air_date")[:4] if episodeinfo.get("air_date") else "") # 季 DomUtils.add_node(doc, root, "season", str(season)) # 集 DomUtils.add_node(doc, root, "episode", str(episode)) # 评分 DomUtils.add_node(doc, root, "rating", episodeinfo.get("vote_average") or "0") # 导演 directors = episodeinfo.get("crew") or [] for director in directors: if director.get("known_for_department") == "Directing": xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "") xdirector.setAttribute("tmdbid", str(director.get("id") or "")) # 演员 actors = episodeinfo.get("guest_stars") or [] for actor in actors: if actor.get("known_for_department") == "Acting": xactor = DomUtils.add_node(doc, root, "actor") DomUtils.add_node(doc, xactor, "name", actor.get("name") or "") DomUtils.add_node(doc, xactor, "type", "Actor") DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "") # 保存文件 self.__save_nfo(doc, file_path.with_suffix(".nfo")) @staticmethod def __save_image(url: str, file_path: Path): """ 下载图片并保存 """ if file_path.exists(): return try: logger.info(f"正在下载{file_path.stem}图片:{url} ...") r = RequestUtils().get_res(url=url) if r: file_path.write_bytes(r.content) logger.info(f"图片已保存:{file_path}") else: logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性") except Exception as err: logger.error(f"{file_path.stem}图片下载失败:{err}") @staticmethod def __save_nfo(doc, file_path: Path): """ 保存NFO """ if file_path.exists(): return xml_str = doc.toprettyxml(indent=" ", encoding="utf-8") file_path.write_bytes(xml_str) logger.info(f"NFO文件已保存:{file_path}")