import time from pathlib import Path from typing import List, Optional, Tuple, Union from xml.dom import minidom from app.core.context import MediaInfo from app.core.config import settings from app.core.meta_info import MetaInfo from app.core.meta import MetaBase from app.log import logger from app.modules import _ModuleBase from app.modules.douban.apiv2 import DoubanApi from app.utils.dom import DomUtils from app.utils.http import RequestUtils from app.utils.system import SystemUtils from app.utils.types import MediaType class Douban(_ModuleBase): def __init__(self): super().__init__() self.doubanapi = DoubanApi() def init_module(self) -> None: pass def stop(self): pass def init_setting(self) -> Tuple[str, Union[str, bool]]: pass def douban_info(self, doubanid: str) -> Optional[dict]: """ 获取豆瓣信息 :param doubanid: 豆瓣ID :return: 识别的媒体信息,包括剧集信息 """ if not doubanid: return None douban_info = self.doubanapi.movie_detail(doubanid) if douban_info: celebrities = self.doubanapi.movie_celebrities(doubanid) if celebrities: douban_info["directors"] = celebrities.get("directors") douban_info["actors"] = celebrities.get("actors") else: douban_info = self.doubanapi.tv_detail(doubanid) celebrities = self.doubanapi.tv_celebrities(doubanid) if douban_info and celebrities: douban_info["directors"] = celebrities.get("directors") douban_info["actors"] = celebrities.get("actors") return self.__extend_doubaninfo(douban_info) @staticmethod def __extend_doubaninfo(doubaninfo: dict): """ 补充添加豆瓣信息 """ # 类型 if doubaninfo.get("type") == "movie": doubaninfo['media_type'] = MediaType.MOVIE elif doubaninfo.get("type") == "tv": doubaninfo['media_type'] = MediaType.TV else: return doubaninfo # 评分 rating = doubaninfo.get('rating') if rating: doubaninfo['vote_average'] = float(rating.get("value")) else: doubaninfo['vote_average'] = 0 # 海报 if doubaninfo.get("type") == "movie": poster_path = doubaninfo.get('cover', {}).get("url") if not poster_path: poster_path = doubaninfo.get('cover_url') if not poster_path: poster_path = doubaninfo.get('pic', {}).get("large") else: poster_path = doubaninfo.get('pic', {}).get("normal") if poster_path: poster_path = poster_path.replace("s_ratio_poster", "m_ratio_poster") doubaninfo['poster_path'] = poster_path # 简介 doubaninfo['overview'] = doubaninfo.get("card_subtitle") or "" return doubaninfo def search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]: """ 搜索媒体信息 :param meta: 识别的元数据 :reutrn: 媒体信息 """ # 未启用豆瓣搜索时返回None if settings.SEARCH_SOURCE != "douban": return None if not meta.get_name(): return [] result = self.doubanapi.search(meta.get_name()) if not result: return [] # 返回数据 ret_medias = [] for item_obj in result.get("items"): if meta.type and meta.type.value != item_obj.get("type_name"): continue if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value): continue ret_medias.append(MediaInfo(douban_info=item_obj.get("target"))) return ret_medias def match(self, name: str, year: str, season: int = None) -> dict: """ 搜索和匹配豆瓣信息 """ result = self.doubanapi.search(f"{name} {year or ''}") if not result: return {} for item_obj in result.get("items"): if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value): continue title = item_obj.get("title") if not title: continue meta = MetaInfo(title) if meta.get_name() == name and (not season or meta.begin_season == season): return item_obj return {} def scrape_metadata(self, path: Path, mediainfo: MediaInfo) -> None: """ 刮削元数据 :param path: 媒体文件路径 :param mediainfo: 识别的媒体信息 :return: 成功或失败 """ if settings.SCRAP_SOURCE != "douban": return None # 目录下的所有文件 for file in SystemUtils.list_files_with_extensions(path, settings.RMT_MEDIAEXT): if not file: continue logger.info(f"开始刮削媒体库文件:{file} ...") try: meta = MetaInfo(file.stem) if not meta.get_name(): continue # 根据名称查询豆瓣数据 doubaninfo = self.match(name=mediainfo.title, year=mediainfo.year, season=meta.begin_season) if not doubaninfo: logger.warn(f"未找到 {mediainfo.title} 的豆瓣信息") break doubaninfo = self.__extend_doubaninfo(doubaninfo) # 刮削 self.gen_scraper_files(meta, doubaninfo, file) except Exception as e: logger.error(f"刮削文件 {file} 失败,原因:{e}") logger.info(f"{file} 刮削完成") def gen_scraper_files(self, meta: MetaBase, doubaninfo: dict, file_path: Path): """ 生成刮削文件 :param meta: 元数据 :param doubaninfo: 豆瓣信息 :param file_path: 文件路径 """ try: # 电影 if meta.type == MediaType.MOVIE: # 强制或者不已存在时才处理 if not file_path.with_name("movie.nfo").exists() \ and not file_path.with_suffix(".nfo").exists(): # 生成电影描述文件 self.__gen_movie_nfo_file(doubaninfo=doubaninfo, file_path=file_path) # 生成电影图片 self.__save_image(url=doubaninfo.get('poster_path'), file_path=file_path.with_name(f"poster{Path(doubaninfo.get('poster_path')).suffix}")) # 电视剧 else: # 不存在时才处理 if not file_path.parent.with_name("tvshow.nfo").exists(): # 根目录描述文件 self.__gen_tv_nfo_file(doubaninfo=doubaninfo, dir_path=file_path.parents[1]) # 生成根目录图片 self.__save_image(url=doubaninfo.get('poster_path'), file_path=file_path.with_name(f"poster{Path(doubaninfo.get('poster_path')).suffix}")) # 季目录NFO if not file_path.with_name("season.nfo").exists(): self.__gen_tv_season_nfo_file(seasoninfo=doubaninfo, season=meta.begin_season, season_path=file_path.parent) except Exception as e: logger.error(f"{file_path} 刮削失败:{e}") @staticmethod def __gen_common_nfo(doubaninfo: dict, doc, root): # 添加时间 DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(doubaninfo.get('overview') or "")) xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(doubaninfo.get('.overview') or "")) # 导演 for director in doubaninfo.get('directors'): DomUtils.add_node(doc, root, "director", director.get("name") or "") # 演员 for actor in doubaninfo.get('actors'): xactor = DomUtils.add_node(doc, root, "actor") DomUtils.add_node(doc, xactor, "name", actor.get("name") or "") DomUtils.add_node(doc, xactor, "type", "Actor") DomUtils.add_node(doc, xactor, "role", actor.get("character") or actor.get("role") or "") DomUtils.add_node(doc, xactor, "thumb", actor.get('avatar', {}).get('normal')) DomUtils.add_node(doc, xactor, "profile", actor.get('url')) # 评分 DomUtils.add_node(doc, root, "rating", doubaninfo.get('vote_average') or "0") return doc def __gen_movie_nfo_file(self, doubaninfo: dict, file_path: Path): """ 生成电影的NFO描述文件 :param doubaninfo: 豆瓣信息 :param file_path: 电影文件路径 """ # 开始生成XML logger.info(f"正在生成电影NFO文件:{file_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "movie") # 公共部分 doc = self.__gen_common_nfo(doubaninfo=doubaninfo, doc=doc, root=root) # 标题 DomUtils.add_node(doc, root, "title", doubaninfo.get('title') or "") # 年份 DomUtils.add_node(doc, root, "year", doubaninfo.get('year') or "") # 保存 self.__save_nfo(doc, file_path.with_suffix(".nfo")) def __gen_tv_nfo_file(self, doubaninfo: dict, dir_path: Path): """ 生成电视剧的NFO描述文件 :param doubaninfo: 媒体信息 :param dir_path: 电视剧根目录 """ # 开始生成XML logger.info(f"正在生成电视剧NFO文件:{dir_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "tvshow") # 公共部分 doc = self.__gen_common_nfo(doubaninfo=doubaninfo, doc=doc, root=root) # 标题 DomUtils.add_node(doc, root, "title", doubaninfo.get('title') or "") # 年份 DomUtils.add_node(doc, root, "year", doubaninfo.get('year') or "") DomUtils.add_node(doc, root, "season", "-1") DomUtils.add_node(doc, root, "episode", "-1") # 保存 self.__save_nfo(doc, dir_path.joinpath("tvshow.nfo")) def __gen_tv_season_nfo_file(self, seasoninfo: dict, season: int, season_path: Path): """ 生成电视剧季的NFO描述文件 :param seasoninfo: TMDB季媒体信息 :param season: 季号 :param season_path: 电视剧季的目录 """ logger.info(f"正在生成季NFO文件:{season_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "season") # 添加时间 DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(seasoninfo.get("overview") or "")) xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(seasoninfo.get("overview") or "")) # 标题 DomUtils.add_node(doc, root, "title", "季 %s" % season) # 发行日期 DomUtils.add_node(doc, root, "premiered", seasoninfo.get("air_date") or "") DomUtils.add_node(doc, root, "releasedate", seasoninfo.get("air_date") or "") # 发行年份 DomUtils.add_node(doc, root, "year", seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "") # seasonnumber DomUtils.add_node(doc, root, "seasonnumber", str(season)) # 保存 self.__save_nfo(doc, season_path.joinpath("season.nfo")) def __gen_tv_episode_nfo_file(self, episodeinfo: dict, season: int, episode: int, file_path: Path, force_nfo: bool = False): """ 生成电视剧集的NFO描述文件 :param episodeinfo: 集TMDB元数据 :param season: 季号 :param episode: 集号 :param file_path: 集文件的路径 :param force_nfo: 是否强制生成NFO文件 """ if not force_nfo and file_path.with_suffix(".nfo").exists(): return # 开始生成集的信息 logger.info(f"正在生成剧集NFO文件:{file_path.name}") doc = minidom.Document() root = DomUtils.add_node(doc, doc, "episodedetails") # 添加时间 DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) # TMDBID uniqueid = DomUtils.add_node(doc, root, "uniqueid", episodeinfo.get("id") or "") uniqueid.setAttribute("type", "tmdb") uniqueid.setAttribute("default", "true") # tmdbid DomUtils.add_node(doc, root, "tmdbid", episodeinfo.get("id") or "") # 标题 DomUtils.add_node(doc, root, "title", episodeinfo.get("name") or "第 %s 集" % episode) # 简介 xplot = DomUtils.add_node(doc, root, "plot") xplot.appendChild(doc.createCDATASection(episodeinfo.get("overview") or "")) xoutline = DomUtils.add_node(doc, root, "outline") xoutline.appendChild(doc.createCDATASection(episodeinfo.get("overview") or "")) # 发布日期 DomUtils.add_node(doc, root, "aired", episodeinfo.get("air_date") or "") # 年份 DomUtils.add_node(doc, root, "year", episodeinfo.get("air_date")[:4] if episodeinfo.get("air_date") else "") # 季 DomUtils.add_node(doc, root, "season", str(season)) # 集 DomUtils.add_node(doc, root, "episode", str(episode)) # 评分 DomUtils.add_node(doc, root, "rating", episodeinfo.get("vote_average") or "0") # 导演 directors = episodeinfo.get("crew") or [] for director in directors: if director.get("known_for_department") == "Directing": xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "") xdirector.setAttribute("tmdbid", str(director.get("id") or "")) # 演员 actors = episodeinfo.get("guest_stars") or [] for actor in actors: if actor.get("known_for_department") == "Acting": xactor = DomUtils.add_node(doc, root, "actor") DomUtils.add_node(doc, xactor, "name", actor.get("name") or "") DomUtils.add_node(doc, xactor, "type", "Actor") DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "") # 保存文件 self.__save_nfo(doc, file_path.with_suffix(".nfo")) @staticmethod def __save_image(url: str, file_path: Path): """ 下载图片并保存 """ if file_path.exists(): return try: logger.info(f"正在下载{file_path.stem}图片:{url} ...") r = RequestUtils().get_res(url=url) if r: file_path.write_bytes(r.content) logger.info(f"图片已保存:{file_path}") else: logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性") except Exception as err: logger.error(f"{file_path.stem}图片下载失败:{err}") @staticmethod def __save_nfo(doc, file_path: Path): """ 保存NFO """ if file_path.exists(): return xml_str = doc.toprettyxml(indent=" ", encoding="utf-8") file_path.write_bytes(xml_str) logger.info(f"NFO文件已保存:{file_path}")