fix themoviedb scraper

This commit is contained in:
jxxghp
2023-06-06 20:24:04 +08:00
parent 9047e0252a
commit cfe90a4522
12 changed files with 687 additions and 139 deletions

View File

@ -1,3 +1,4 @@
from pathlib import Path
from typing import List, Optional
from app.chain import _ChainBase
@ -35,9 +36,12 @@ class TransferChain(_ChainBase):
# 更新媒体图片
self.run_module("obtain_image", mediainfo=mediainfo)
# 转移
result: bool = self.run_module("transfer", mediainfo=mediainfo, path=torrent.get("path"))
if not result:
dest_path: Path = self.run_module("transfer", mediainfo=mediainfo, path=torrent.get("path"))
if not dest_path:
logger.warn(f"{torrent.get('title')} 转移失败")
return False
# 刮剥
self.run_module("scrape_metadata", path=dest_path, mediainfo=mediainfo)
logger.info("下载器文件转移执行完成")
return True

View File

@ -1,4 +1,4 @@
from typing import Optional, Any
from typing import Optional, Any, List
from app.core.config import settings
from app.core.meta import MetaBase
@ -126,6 +126,10 @@ class MediaInfo(object):
tmdb_info: Optional[dict] = {}
# 豆瓣 INFO
douban_info: Optional[dict] = {}
# 导演
directors: List[dict] = []
# 演员
actors: List[dict] = []
def __init__(self, tmdb_info: dict = None, douban_info: dict = None):
if tmdb_info:
@ -155,6 +159,61 @@ class MediaInfo(object):
"""
初始化媒信息
"""
def __directors_actors(tmdbinfo: dict):
"""
查询导演和演员
:param tmdbinfo: TMDB元数据
:return: 导演列表,演员列表
"""
"""
"cast": [
{
"adult": false,
"gender": 2,
"id": 3131,
"known_for_department": "Acting",
"name": "Antonio Banderas",
"original_name": "Antonio Banderas",
"popularity": 60.896,
"profile_path": "/iWIUEwgn2KW50MssR7tdPeFoRGW.jpg",
"cast_id": 2,
"character": "Puss in Boots (voice)",
"credit_id": "6052480e197de4006bb47b9a",
"order": 0
}
],
"crew": [
{
"adult": false,
"gender": 2,
"id": 5524,
"known_for_department": "Production",
"name": "Andrew Adamson",
"original_name": "Andrew Adamson",
"popularity": 9.322,
"profile_path": "/qqIAVKAe5LHRbPyZUlptsqlo4Kb.jpg",
"credit_id": "63b86b2224b33300a0585bf1",
"department": "Production",
"job": "Executive Producer"
}
]
"""
if not tmdbinfo:
return [], []
_credits = tmdbinfo.get("credits")
if not _credits:
return [], []
directors = []
actors = []
for cast in self.__dict_media_casts(_credits.get("cast")):
if cast.get("known_for_department") == "Acting":
actors.append(cast)
for crew in self.__dict_media_crews(_credits.get("crew")):
if crew.get("job") == "Director":
directors.append(crew)
return directors, actors
if not info:
return
# 本体
@ -208,6 +267,8 @@ class MediaInfo(object):
# 背景
if info.get('backdrop_path'):
self.backdrop_path = f"https://{settings.TMDB_IMAGE_DOMAIN}{info.get('backdrop_path')}"
# 导演和演员
self.directors, self.actors = __directors_actors(info)
def set_douban_info(self, info: dict):
"""

View File

@ -147,12 +147,12 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[bool]:
def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[str]:
"""
转移一个路径下的文件
:param path: 文件路径
:param mediainfo: 识别的媒体信息
:return: 成功或失败
:return: 转移后的目录或None代表失败
"""
pass

View File

@ -1,3 +1,4 @@
from pathlib import Path
from typing import List, Optional, Tuple, Union
from app.core import MediaInfo, settings
@ -104,7 +105,7 @@ class Douban(_ModuleBase):
return ret_medias
def scrape_metadata(self, path: str, mediainfo: MediaInfo) -> None:
def scrape_metadata(self, path: Path, mediainfo: MediaInfo) -> None:
"""
TODO 刮削元数据
:param path: 媒体文件路径

View File

@ -53,7 +53,7 @@ class FanartModule(_ModuleBase):
"""
转换Fanart图片的名字
"""
words_to_remove = r'tv|movie|hdmovie|hdtv'
words_to_remove = r'tv|movie|hdmovie|hdtv|show|hd'
pattern = re.compile(words_to_remove, re.IGNORECASE)
result = re.sub(pattern, '', fanart_name)
return result

View File

@ -23,12 +23,12 @@ class FileTransferModule(_ModuleBase):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[bool]:
def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[str]:
"""
文件转移
:param path: 文件路径
:param mediainfo: 识别的媒体信息
:return: 成功或失败
:return: 转移后的目录或None代表失败
"""
if not settings.LIBRARY_PATH:
logger.error("未设置媒体库目录,无法转移文件")
@ -254,7 +254,7 @@ class FileTransferModule(_ModuleBase):
logger.warn(f"{new_file} 文件已存在")
continue
if not new_file.parent.exists():
new_file.parent.mkdir(parents=True)
new_file.parent.mkdir(parents=True, exist_ok=True)
retcode = self.__transfer_command(file_item=file,
target_file=new_file,
rmt_mode=rmt_mode)
@ -311,7 +311,7 @@ class FileTransferModule(_ModuleBase):
meidainfo: MediaInfo,
rmt_mode: str = None,
target_dir: Path = None
) -> Tuple[bool, str]:
) -> Tuple[Optional[Path], str]:
"""
识别并转移一个文件、多个文件或者目录
:param in_path: 转移的路径,可能是一个文件也可以是一个目录
@ -322,10 +322,10 @@ class FileTransferModule(_ModuleBase):
"""
# 检查目录路径
if not in_path.exists():
return False, f"路径不存在:{in_path}"
return None, f"路径不存在:{in_path}"
if not target_dir.exists():
return False, f"目标路径不存在:{target_dir}"
return None, f"目标路径不存在:{target_dir}"
# 目的目录加上类型和二级分类
target_dir = target_dir / meidainfo.type.value / meidainfo.category
@ -351,18 +351,47 @@ class FileTransferModule(_ModuleBase):
new_path=new_path,
rmt_mode=rmt_mode)
if retcode != 0:
return False, f"蓝光原盘转移失败,错误码:{retcode}"
return None, f"蓝光原盘转移失败,错误码:{retcode}"
else:
return True, ""
# 返回转移后的路径
return new_path, ""
else:
# 获取文件清单
transfer_files: List[Path] = SystemUtils.list_files_with_extensions(in_path, settings.RMT_MEDIAEXT)
if len(transfer_files) == 0:
return False, f"目录下没有找到可转移的文件:{in_path}"
return None, f"目录下没有找到可转移的文件:{in_path}"
# 识别目录名称,不包括后缀
meta = MetaInfo(in_path.stem)
# 目的路径
new_path = target_dir / self.get_rename_path(
template_string=rename_format,
rename_dict=self.__get_naming_dict(meta=meta,
mediainfo=meidainfo)
).parents[-2].name
# 转移所有文件
for transfer_file in transfer_files:
try:
# 识别文件元数据,不包含后缀
meta = MetaInfo(transfer_file.stem)
file_meta = MetaInfo(transfer_file.stem)
# 组合目录和文件的Meta信息
meta.type = file_meta.type
# 开始季
if file_meta.begin_season:
meta.begin_season = file_meta.begin_season
# 开始集
if file_meta.begin_episode:
meta.begin_episode = file_meta.begin_episode
# 结束集
if file_meta.end_episode:
meta.end_episode = file_meta.end_episode
# 总季数
if file_meta.total_seasons:
meta.total_seasons = file_meta.total_seasons
# 总集数
if file_meta.total_episodes:
meta.total_episodes = file_meta.total_episodes
# 结束季为空
meta.end_season = None
# 目的文件名
new_file = self.get_rename_path(
path=target_dir,
@ -383,9 +412,11 @@ class FileTransferModule(_ModuleBase):
rmt_mode=rmt_mode,
over_flag=overflag)
if retcode != 0:
return False, f"文件转移失败,错误码:{retcode}"
return None, f"文件转移失败,错误码:{retcode}"
except Exception as err:
return None, f"文件转移失败,错误信息:{err}"
return True, ""
return new_path, ""
@staticmethod
def __get_naming_dict(meta: MetaBase, mediainfo: MediaInfo, file_ext: str = None) -> dict:
@ -449,7 +480,7 @@ class FileTransferModule(_ModuleBase):
pass
@staticmethod
def get_rename_path(path: Path, template_string: str, rename_dict: dict) -> Path:
def get_rename_path(template_string: str, rename_dict: dict, path: Path = None) -> Path:
"""
生成重命名后的完整路径
"""
@ -458,4 +489,7 @@ class FileTransferModule(_ModuleBase):
# 渲染生成的字符串
render_str = template.render(rename_dict)
# 目的路径
if path:
return path / render_str
else:
return Path(render_str)

View File

@ -3,7 +3,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Optional, Tuple, Union
from app.core import MediaInfo, TorrentInfo, settings
from app.core import MediaInfo, TorrentInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.indexer.spider import TorrentSpider

View File

@ -85,7 +85,7 @@ class TNodeSpider:
'title': result.get('title'),
'description': result.get('subtitle'),
'enclosure': self._downloadurl % (self._domain, result.get('id')),
'pubdate': StringUtils.timestamp_to_date(result.get('upload_time')),
'pubdate': StringUtils.format_timestamp(result.get('upload_time')),
'size': result.get('size'),
'seeders': result.get('seeding'),
'peers': result.get('leeching'),

View File

@ -43,7 +43,7 @@ class TorrentLeech:
'indexer': self._indexer.get('id'),
'title': result.get('name'),
'enclosure': self._downloadurl % (self._indexer.get('domain'), result.get('fid'), result.get('filename')),
'pubdate': StringUtils.timestamp_to_date(result.get('addedTimestamp')),
'pubdate': StringUtils.format_timestamp(result.get('addedTimestamp')),
'size': result.get('size'),
'seeders': result.get('seeders'),
'peers': result.get('leechers'),

View File

@ -1,11 +1,18 @@
import time
from pathlib import Path
from typing import Optional, List, Tuple, Union
from xml.dom import minidom
from app.core import settings, MediaInfo
from app.core import settings, MediaInfo, MetaInfo
from app.core.meta import MetaBase
from app.log import logger
from app.modules import _ModuleBase
from app.modules.themoviedb.category import CategoryHelper
from app.modules.themoviedb.tmdb import TmdbHelper
from app.modules.themoviedb.tmdb_cache import TmdbCache
from app.utils.dom import DomUtils
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
from app.utils.types import MediaType
@ -44,52 +51,50 @@ class TheMovieDb(_ModuleBase):
# 缓存没有或者强制不使用缓存
if tmdbid:
# 直接查询详情
info = self.tmdb.get_tmdb_info(mtype=meta.type, tmdbid=tmdbid)
info = self.tmdb.get_info(mtype=meta.type, tmdbid=tmdbid)
else:
if meta.type != MediaType.TV and not meta.year:
info = self.tmdb.search_multi_tmdb(meta.get_name())
info = self.tmdb.search_multi(meta.get_name())
else:
if meta.type == MediaType.TV:
# 确定是电视
info = self.tmdb.search_tmdb(name=meta.get_name(),
info = self.tmdb.match(name=meta.get_name(),
year=meta.year,
mtype=meta.type,
season_year=meta.year,
season_number=meta.begin_season
)
season_number=meta.begin_season)
if meta.year:
# 非严格模式下去掉年份再查一次
info = self.tmdb.search_tmdb(name=meta.get_name(),
info = self.tmdb.match(name=meta.get_name(),
mtype=meta.type)
else:
# 有年份先按电影查
info = self.tmdb.search_tmdb(name=meta.get_name(),
info = self.tmdb.match(name=meta.get_name(),
year=meta.year,
mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = self.tmdb.search_tmdb(name=meta.get_name(),
info = self.tmdb.match(name=meta.get_name(),
year=meta.year,
mtype=MediaType.TV
)
mtype=MediaType.TV)
if not info:
# 非严格模式下去掉年份和类型再查一次
info = self.tmdb.search_multi_tmdb(name=meta.get_name())
info = self.tmdb.search_multi(name=meta.get_name())
if not info:
# 从网站查询
info = self.tmdb.search_tmdb_web(name=meta.get_name(),
info = self.tmdb.search_web(name=meta.get_name(),
mtype=meta.type)
# 补充全量信息
if info and not info.get("genres"):
info = self.tmdb.get_tmdb_info(mtype=info.get("media_type"),
info = self.tmdb.get_info(mtype=info.get("media_type"),
tmdbid=info.get("id"))
# 保存到缓存
self.cache.update(meta, info)
else:
# 使用缓存信息
if cache_info.get("title"):
info = self.tmdb.get_tmdb_info(mtype=cache_info.get("type"),
info = self.tmdb.get_info(mtype=cache_info.get("type"),
tmdbid=cache_info.get("id"))
else:
info = None
@ -118,11 +123,11 @@ class TheMovieDb(_ModuleBase):
if not meta.get_name():
return []
if not meta.type and not meta.year:
results = self.tmdb.search_multi_tmdbinfos(meta.get_name())
results = self.tmdb.search_multiis(meta.get_name())
else:
if not meta.type:
results = list(
set(self.tmdb.search_movie_tmdbinfos(meta.get_name(), meta.year))
set(self.tmdb.search_movies(meta.get_name(), meta.year))
.union(set(self.tmdb.search_tv_tmdbinfos(meta.get_name(), meta.year)))
)
# 组合结果的情况下要排序
@ -132,18 +137,367 @@ class TheMovieDb(_ModuleBase):
reverse=True
)
elif meta.type == MediaType.MOVIE:
results = self.tmdb.search_movie_tmdbinfos(meta.get_name(), meta.year)
results = self.tmdb.search_movies(meta.get_name(), meta.year)
else:
results = self.tmdb.search_tv_tmdbinfos(meta.get_name(), meta.year)
return [MediaInfo(tmdb_info=info) for info in results]
def scrape_metadata(self, path: str, mediainfo: MediaInfo) -> None:
def scrape_metadata(self, path: Path, mediainfo: MediaInfo,
force_nfo: bool = False, force_pic: bool = False) -> None:
"""
TODO 刮削元数据
刮削元数据
:param path: 媒体文件路径
:param mediainfo: 识别的媒体信息
:param force_nfo: 强制刮削nfo
:param force_pic: 强制刮削图片
:return: 成功或失败
"""
if settings.SCRAP_SOURCE != "themoviedb":
return None
# 目录下的所有文件
for file in SystemUtils.list_files_with_extensions(path, settings.RMT_MEDIAEXT):
if not file:
continue
logger.info(f"开始刮削媒体库文件:{file} ...")
self.gen_scraper_files(mediainfo=mediainfo,
file_path=file,
force_nfo=force_nfo,
force_pic=force_pic)
logger.info(f"{file} 刮削完成")
def gen_scraper_files(self, mediainfo: MediaInfo, file_path: Path,
force_nfo: bool = False, force_pic: bool = False):
"""
生成刮削文件
:param mediainfo: 媒体信息
:param file_path: 文件路径
:param force_nfo: 强制刮削nfo
:param force_pic: 强制刮削图片
"""
def __get_episode_detail(_seasoninfo: dict, _episode: int):
"""
根据季信息获取集的信息
"""
for _episode_info in _seasoninfo.get("episodes") or []:
if _episode_info.get("episode_number") == _episode:
return _episode_info
return {}
try:
# 电影
if mediainfo.type == MediaType.MOVIE:
# 强制或者不已存在时才处理
if force_nfo \
or (not file_path.with_name("movie.nfo").exists()
and not file_path.with_suffix(".nfo").exists()):
# 生成电影描述文件
self.__gen_movie_nfo_file(mediainfo=mediainfo,
file_path=file_path)
# 生成电影图片
for attr_name, attr_value in vars(mediainfo).items():
if attr_value \
and attr_name.endswith("_path") \
and attr_value.startswith("http"):
image_name = attr_name.replace("_path", "") + Path(attr_value).suffix
self.__save_image(url=attr_value,
file_path=file_path.parent / image_name,
force=force_pic)
# 电视剧
else:
# 识别
meta = MetaInfo(file_path.stem)
# 根目录NFO
if force_nfo \
or (not file_path.with_name("tvshow.nfo").exists()
and not file_path.with_suffix(".nfo").exists()):
# 根目录描述文件
self.__gen_tv_nfo_file(mediainfo=mediainfo,
dir_path=file_path.parents[1],
force_nfo=force_nfo)
# 生成根目录图片
for attr_name, attr_value in vars(mediainfo).items():
if attr_value \
and attr_name.endswith("_path") \
and not attr_name.startswith("season") \
and attr_value.startswith("http"):
image_name = attr_name.replace("_path", "") + Path(attr_value).suffix
self.__save_image(url=attr_value,
file_path=file_path.parents[1] / image_name,
force=force_pic)
# 查询季信息
seasoninfo = self.tmdb.get_tv_season_detail(mediainfo.tmdb_id, meta.begin_season)
if seasoninfo:
# 季目录NFO
self.__gen_tv_season_nfo_file(seasoninfo=seasoninfo,
season=meta.begin_season,
season_path=file_path.parent,
force_nfo=force_nfo)
# 季的图片
for attr_name, attr_value in vars(mediainfo).items():
if attr_value \
and attr_name.startswith("season") \
and attr_value.startswith("http"):
image_name = attr_name.replace("_path",
"").replace("season",
f"{str(meta.begin_season).rjust(2, '0')}-")\
+ Path(attr_value).suffix
self.__save_image(url=attr_value,
file_path=file_path.parent / image_name,
force=force_pic)
# 查询集详情
episodeinfo = __get_episode_detail(seasoninfo, meta.begin_episode)
if episodeinfo:
# 集NFO
self.__gen_tv_episode_nfo_file(episodeinfo=episodeinfo,
season=meta.begin_season,
episode=meta.begin_episode,
file_path=file_path,
force_nfo=force_nfo)
# 集的图片
if episodeinfo.get('still_path'):
self.__save_image(f"https://image.tmdb.org/t/p/original{episodeinfo.get('still_path')}",
file_path.with_suffix(".jpg"),
force_pic)
except Exception as e:
logger.error(f"{file_path} 刮削失败:{e}")
@staticmethod
def __gen_common_nfo(mediainfo: MediaInfo, doc, root):
# TMDBINFO
tmdbinfo = mediainfo.tmdb_info
# 添加时间
DomUtils.add_node(doc, root, "dateadded",
time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
# TMDB
DomUtils.add_node(doc, root, "tmdbid", mediainfo.tmdb_id or "")
uniqueid_tmdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.tmdb_id or "")
uniqueid_tmdb.setAttribute("type", "tmdb")
uniqueid_tmdb.setAttribute("default", "true")
# TVDB
if mediainfo.tvdb_id:
DomUtils.add_node(doc, root, "tvdbid", mediainfo.tvdb_id)
uniqueid_tvdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.tvdb_id)
uniqueid_tvdb.setAttribute("type", "tvdb")
# IMDB
if mediainfo.imdb_id:
DomUtils.add_node(doc, root, "imdbid", mediainfo.imdb_id)
uniqueid_imdb = DomUtils.add_node(doc, root, "uniqueid", mediainfo.imdb_id)
uniqueid_imdb.setAttribute("type", "imdb")
uniqueid_imdb.setAttribute("default", "true")
uniqueid_tmdb.setAttribute("default", "false")
# 简介
xplot = DomUtils.add_node(doc, root, "plot")
xplot.appendChild(doc.createCDATASection(mediainfo.overview or ""))
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(mediainfo.overview or ""))
# 导演
for director in mediainfo.directors:
xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "")
xdirector.setAttribute("tmdbid", str(director.get("id") or ""))
# 演员
for actor in mediainfo.actors:
xactor = DomUtils.add_node(doc, root, "actor")
DomUtils.add_node(doc, xactor, "name", actor.get("name") or "")
DomUtils.add_node(doc, xactor, "type", "Actor")
DomUtils.add_node(doc, xactor, "role", actor.get("character") or actor.get("role") or "")
DomUtils.add_node(doc, xactor, "order", actor.get("order") if actor.get("order") is not None else "")
DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "")
DomUtils.add_node(doc, xactor, "thumb", actor.get('image'))
DomUtils.add_node(doc, xactor, "profile", actor.get('profile'))
# 风格
genres = tmdbinfo.get("genres") or []
for genre in genres:
DomUtils.add_node(doc, root, "genre", genre.get("name") or "")
# 评分
DomUtils.add_node(doc, root, "rating", mediainfo.vote_average or "0")
# 评级
if tmdbinfo.get("releases") and tmdbinfo.get("releases").get("countries"):
releases = [i for i in tmdbinfo.get("releases").get("countries") if
i.get("certification") and i.get("certification").strip()]
# 国内没有分级,所以沿用美国的分级
us_release = next((c for c in releases if c.get("iso_3166_1") == "US"), None)
if us_release:
DomUtils.add_node(doc, root, "mpaa", us_release.get("certification") or "")
return doc
def __gen_movie_nfo_file(self,
mediainfo: MediaInfo,
file_path: Path,
force_nfo: bool = False):
"""
生成电影的NFO描述文件
:param mediainfo: 识别后的媒体信息
:param file_path: 电影文件路径
:param force_nfo: 是否强制生成NFO文件
"""
# 开始生成XML
logger.info(f"正在生成电影NFO文件{file_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "movie")
# 公共部分
doc = self.__gen_common_nfo(mediainfo=mediainfo,
doc=doc,
root=root)
# 标题
DomUtils.add_node(doc, root, "title", mediainfo.title or "")
DomUtils.add_node(doc, root, "originaltitle", mediainfo.original_title or "")
# 发布日期
DomUtils.add_node(doc, root, "premiered", mediainfo.release_date or "")
# 年份
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
# 保存
self.__save_nfo(doc, file_path.with_suffix(".nfo"), force=force_nfo)
def __gen_tv_nfo_file(self,
mediainfo: MediaInfo,
dir_path: Path,
force_nfo: bool = False):
"""
生成电视剧的NFO描述文件
:param mediainfo: 媒体信息
:param dir_path: 电视剧根目录
:param force_nfo: 是否强制生成NFO文件
"""
# 开始生成XML
logger.info(f"正在生成电视剧NFO文件{dir_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "tvshow")
# 公共部分
doc = self.__gen_common_nfo(mediainfo=mediainfo,
doc=doc,
root=root)
# 标题
DomUtils.add_node(doc, root, "title", mediainfo.title or "")
DomUtils.add_node(doc, root, "originaltitle", mediainfo.original_title or "")
# 发布日期
DomUtils.add_node(doc, root, "premiered", mediainfo.release_date or "")
# 年份
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
DomUtils.add_node(doc, root, "season", "-1")
DomUtils.add_node(doc, root, "episode", "-1")
# 保存
self.__save_nfo(doc, dir_path.with_name("tvshow.nfo"), force_nfo)
def __gen_tv_season_nfo_file(self, seasoninfo: dict, season: int, season_path: Path,
force_nfo: bool = False):
"""
生成电视剧季的NFO描述文件
:param seasoninfo: TMDB季媒体信息
:param season: 季号
:param season_path: 电视剧季的目录
:param force_nfo: 是否强制生成NFO文件
"""
logger.info(f"正在生成季NFO文件{season_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "season")
# 添加时间
DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
# 简介
xplot = DomUtils.add_node(doc, root, "plot")
xplot.appendChild(doc.createCDATASection(seasoninfo.get("overview") or ""))
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(seasoninfo.get("overview") or ""))
# 标题
DomUtils.add_node(doc, root, "title", "%s" % season)
# 发行日期
DomUtils.add_node(doc, root, "premiered", seasoninfo.get("air_date") or "")
DomUtils.add_node(doc, root, "releasedate", seasoninfo.get("air_date") or "")
# 发行年份
DomUtils.add_node(doc, root, "year", seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "")
# seasonnumber
DomUtils.add_node(doc, root, "seasonnumber", str(season))
# 保存
self.__save_nfo(doc, season_path.with_name("season.nfo"), force_nfo)
def __gen_tv_episode_nfo_file(self,
episodeinfo: dict,
season: int,
episode: int,
file_path: Path,
force_nfo: bool = False):
"""
生成电视剧集的NFO描述文件
:param episodeinfo: 集TMDB元数据
:param season: 季号
:param episode: 集号
:param file_path: 集文件的路径
:param force_nfo: 是否强制生成NFO文件
"""
# 开始生成集的信息
logger.info(f"正在生成剧集NFO文件{file_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "episodedetails")
# 添加时间
DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
# TMDBID
uniqueid = DomUtils.add_node(doc, root, "uniqueid", episodeinfo.get("id") or "")
uniqueid.setAttribute("type", "tmdb")
uniqueid.setAttribute("default", "true")
# tmdbid
DomUtils.add_node(doc, root, "tmdbid", episodeinfo.get("id") or "")
# 标题
DomUtils.add_node(doc, root, "title", episodeinfo.get("name") or "%s" % episode)
# 简介
xplot = DomUtils.add_node(doc, root, "plot")
xplot.appendChild(doc.createCDATASection(episodeinfo.get("overview") or ""))
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(episodeinfo.get("overview") or ""))
# 发布日期
DomUtils.add_node(doc, root, "aired", episodeinfo.get("air_date") or "")
# 年份
DomUtils.add_node(doc, root, "year",
episodeinfo.get("air_date")[:4] if episodeinfo.get("air_date") else "")
# 季
DomUtils.add_node(doc, root, "season", str(season))
# 集
DomUtils.add_node(doc, root, "episode", str(episode))
# 评分
DomUtils.add_node(doc, root, "rating", episodeinfo.get("vote_average") or "0")
# 导演
directors = episodeinfo.get("crew") or []
for director in directors:
if director.get("known_for_department") == "Directing":
xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "")
xdirector.setAttribute("tmdbid", str(director.get("id") or ""))
# 演员
actors = episodeinfo.get("guest_stars") or []
for actor in actors:
if actor.get("known_for_department") == "Acting":
xactor = DomUtils.add_node(doc, root, "actor")
DomUtils.add_node(doc, xactor, "name", actor.get("name") or "")
DomUtils.add_node(doc, xactor, "type", "Actor")
DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "")
# 保存文件
self.__save_nfo(doc, file_path.with_suffix(".nfo"), force_nfo)
@staticmethod
def __save_image(url: str, file_path: Path, force: bool = False):
"""
下载poster.jpg并保存
"""
if not force and file_path.exists():
return
try:
logger.info(f"正在下载{file_path.stem}图片:{url} ...")
r = RequestUtils().get_res(url=url)
if r:
# 下载到temp目录远程则先存到temp再远程移动本地则直接保存
logger.info(f"图片已保存:{file_path.name}")
else:
logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性")
except Exception as err:
logger.error(f"{file_path.stem}图片下载失败:{err}")
@staticmethod
def __save_nfo(doc, file_path: Path, force: bool = False):
if not force and file_path.exists():
return
logger.info(f"正在保存NFO文件{file_path.name}")
xml_str = doc.toprettyxml(indent=" ", encoding="utf-8")
file_path.write_bytes(xml_str)
logger.info(f"NFO文件已保存{file_path.name}")

View File

@ -3,7 +3,7 @@ from typing import Optional, Tuple, List
import zhconv
from lxml import etree
from tmdbv3api import TMDb, Search, Movie, TV
from tmdbv3api import TMDb, Search, Movie, TV, Season, Episode
from tmdbv3api.exceptions import TMDbException
from app.core import settings
@ -44,8 +44,10 @@ class TmdbHelper:
self.search = Search()
self.movie = Movie()
self.tv = TV()
self.season = Season()
self.episode = Episode()
def search_multi_tmdbinfos(self, title: str) -> List[dict]:
def search_multiis(self, title: str) -> List[dict]:
"""
同时查询模糊匹配的电影、电视剧TMDB信息
"""
@ -59,7 +61,7 @@ class TmdbHelper:
ret_infos.append(multi)
return ret_infos
def search_movie_tmdbinfos(self, title: str, year: str) -> List[dict]:
def search_movies(self, title: str, year: str) -> List[dict]:
"""
查询模糊匹配的所有电影TMDB信息
"""
@ -94,7 +96,7 @@ class TmdbHelper:
return ret_infos
@staticmethod
def __compare_tmdb_names(file_name: str, tmdb_names: list) -> bool:
def __compare_names(file_name: str, tmdb_names: list) -> bool:
"""
比较文件名是否匹配,忽略大小写和特殊字符
:param file_name: 识别的文件名或者种子名
@ -112,7 +114,7 @@ class TmdbHelper:
return True
return False
def __get_tmdb_names(self, mtype: MediaType, tmdb_id: str) -> Tuple[Optional[dict], List[str]]:
def __get_names(self, mtype: MediaType, tmdb_id: str) -> Tuple[Optional[dict], List[str]]:
"""
搜索tmdb中所有的标题和译名用于名称匹配
:param mtype: 类型:电影、电视剧、动漫
@ -122,7 +124,7 @@ class TmdbHelper:
if not mtype or not tmdb_id:
return {}, []
ret_names = []
tmdb_info = self.get_tmdb_info(mtype=mtype, tmdbid=tmdb_id)
tmdb_info = self.get_info(mtype=mtype, tmdbid=tmdb_id)
if not tmdb_info:
return tmdb_info, []
if mtype == MediaType.MOVIE:
@ -149,7 +151,7 @@ class TmdbHelper:
ret_names.append(name)
return tmdb_info, ret_names
def search_tmdb(self, name: str,
def match(self, name: str,
mtype: MediaType,
year: str = None,
season_year: str = None,
@ -239,16 +241,16 @@ class TmdbHelper:
if year:
for movie in movies:
if movie.get('release_date'):
if self.__compare_tmdb_names(name, movie.get('title')) \
if self.__compare_names(name, movie.get('title')) \
and movie.get('release_date')[0:4] == str(year):
return movie
if self.__compare_tmdb_names(name, movie.get('original_title')) \
if self.__compare_names(name, movie.get('original_title')) \
and movie.get('release_date')[0:4] == str(year):
return movie
else:
for movie in movies:
if self.__compare_tmdb_names(name, movie.get('title')) \
or self.__compare_tmdb_names(name, movie.get('original_title')):
if self.__compare_names(name, movie.get('title')) \
or self.__compare_names(name, movie.get('original_title')):
return movie
if not info:
index = 0
@ -259,13 +261,13 @@ class TmdbHelper:
if movie.get('release_date')[0:4] != str(year):
continue
index += 1
info, names = self.__get_tmdb_names(MediaType.MOVIE, movie.get("id"))
if self.__compare_tmdb_names(name, names):
info, names = self.__get_names(MediaType.MOVIE, movie.get("id"))
if self.__compare_names(name, names):
return info
else:
index += 1
info, names = self.__get_tmdb_names(MediaType.MOVIE, movie.get("id"))
if self.__compare_tmdb_names(name, names):
info, names = self.__get_names(MediaType.MOVIE, movie.get("id"))
if self.__compare_names(name, names):
return info
if index > 5:
break
@ -298,16 +300,16 @@ class TmdbHelper:
if year:
for tv in tvs:
if tv.get('first_air_date'):
if self.__compare_tmdb_names(name, tv.get('name')) \
if self.__compare_names(name, tv.get('name')) \
and tv.get('first_air_date')[0:4] == str(year):
return tv
if self.__compare_tmdb_names(name, tv.get('original_name')) \
if self.__compare_names(name, tv.get('original_name')) \
and tv.get('first_air_date')[0:4] == str(year):
return tv
else:
for tv in tvs:
if self.__compare_tmdb_names(name, tv.get('name')) \
or self.__compare_tmdb_names(name, tv.get('original_name')):
if self.__compare_names(name, tv.get('name')) \
or self.__compare_names(name, tv.get('original_name')):
return tv
if not info:
index = 0
@ -318,13 +320,13 @@ class TmdbHelper:
if tv.get('first_air_date')[0:4] != str(year):
continue
index += 1
info, names = self.__get_tmdb_names(MediaType.TV, tv.get("id"))
if self.__compare_tmdb_names(name, names):
info, names = self.__get_names(MediaType.TV, tv.get("id"))
if self.__compare_names(name, names):
return info
else:
index += 1
info, names = self.__get_tmdb_names(MediaType.TV, tv.get("id"))
if self.__compare_tmdb_names(name, names):
info, names = self.__get_names(MediaType.TV, tv.get("id"))
if self.__compare_names(name, names):
return info
if index > 5:
break
@ -343,7 +345,7 @@ class TmdbHelper:
if not tv_info:
return False
try:
seasons = self.__get_tmdb_tv_seasons(tv_info)
seasons = self.__get_tv_seasons(tv_info)
for season, season_info in seasons.values():
if season_info.get("air_date"):
if season.get("air_date")[0:4] == str(_season_year) \
@ -368,21 +370,21 @@ class TmdbHelper:
return {}
else:
for tv in tvs:
if (self.__compare_tmdb_names(name, tv.get('name'))
or self.__compare_tmdb_names(name, tv.get('original_name'))) \
if (self.__compare_names(name, tv.get('name'))
or self.__compare_names(name, tv.get('original_name'))) \
and (tv.get('first_air_date') and tv.get('first_air_date')[0:4] == str(season_year)):
return tv
for tv in tvs[:5]:
info, names = self.__get_tmdb_names(MediaType.TV, tv.get("id"))
if not self.__compare_tmdb_names(name, names):
info, names = self.__get_names(MediaType.TV, tv.get("id"))
if not self.__compare_names(name, names):
continue
if __season_match(tv_info=info, _season_year=season_year):
return info
return {}
@staticmethod
def __get_tmdb_tv_seasons(tv_info: dict) -> Optional[dict]:
def __get_tv_seasons(tv_info: dict) -> Optional[dict]:
"""
查询TMDB电视剧的所有季
:param tv_info: TMDB 的季信息
@ -419,7 +421,7 @@ class TmdbHelper:
ret_seasons[season_info.get("season_number")] = season_info
return ret_seasons
def search_multi_tmdb(self, name: str) -> Optional[dict]:
def search_multi(self, name: str) -> Optional[dict]:
"""
根据名称同时查询电影和电视剧,不带年份
:param name: 识别的文件名或种子名
@ -441,22 +443,22 @@ class TmdbHelper:
info = {}
for multi in multis:
if multi.get("media_type") == "movie":
if self.__compare_tmdb_names(name, multi.get('title')) \
or self.__compare_tmdb_names(name, multi.get('original_title')):
if self.__compare_names(name, multi.get('title')) \
or self.__compare_names(name, multi.get('original_title')):
info = multi
elif multi.get("media_type") == "tv":
if self.__compare_tmdb_names(name, multi.get('name')) \
or self.__compare_tmdb_names(name, multi.get('original_name')):
if self.__compare_names(name, multi.get('name')) \
or self.__compare_names(name, multi.get('original_name')):
info = multi
if not info:
for multi in multis[:5]:
if multi.get("media_type") == "movie":
movie_info, names = self.__get_tmdb_names(MediaType.MOVIE, multi.get("id"))
if self.__compare_tmdb_names(name, names):
movie_info, names = self.__get_names(MediaType.MOVIE, multi.get("id"))
if self.__compare_names(name, names):
info = movie_info
elif multi.get("media_type") == "tv":
tv_info, names = self.__get_tmdb_names(MediaType.TV, multi.get("id"))
if self.__compare_tmdb_names(name, names):
tv_info, names = self.__get_names(MediaType.TV, multi.get("id"))
if self.__compare_names(name, names):
info = tv_info
# 返回
if info:
@ -467,7 +469,7 @@ class TmdbHelper:
return info
@lru_cache(maxsize=128)
def search_tmdb_web(self, name: str, mtype: MediaType) -> Optional[dict]:
def search_web(self, name: str, mtype: MediaType) -> Optional[dict]:
"""
搜索TMDB网站直接抓取结果结果只有一条时才返回
:param name: 名称
@ -497,7 +499,7 @@ class TmdbHelper:
if link not in tmdb_links:
tmdb_links.append(link)
if len(tmdb_links) == 1:
tmdbinfo = self.get_tmdb_info(
tmdbinfo = self.get_info(
mtype=MediaType.TV if tmdb_links[0].startswith("/tv") else MediaType.MOVIE,
tmdbid=tmdb_links[0].split("/")[-1])
if tmdbinfo:
@ -525,7 +527,7 @@ class TmdbHelper:
return None
return None
def get_tmdb_info(self,
def get_info(self,
mtype: MediaType,
tmdbid: str) -> dict:
"""
@ -547,11 +549,11 @@ class TmdbHelper:
# 设置语言
if mtype == MediaType.MOVIE:
tmdb_info = self.__get_tmdb_movie_detail(tmdbid)
tmdb_info = self.__get_movie_detail(tmdbid)
if tmdb_info:
tmdb_info['media_type'] = MediaType.MOVIE
else:
tmdb_info = self.__get_tmdb_tv_detail(tmdbid)
tmdb_info = self.__get_tv_detail(tmdbid)
if tmdb_info:
tmdb_info['media_type'] = MediaType.TV
if tmdb_info:
@ -599,7 +601,7 @@ class TmdbHelper:
else:
tmdb_info['name'] = cn_title
def __get_tmdb_movie_detail(self,
def __get_movie_detail(self,
tmdbid: str,
append_to_response: str = "images,"
"credits,"
@ -711,7 +713,7 @@ class TmdbHelper:
print(str(e))
return None
def __get_tmdb_tv_detail(self,
def __get_tv_detail(self,
tmdbid: str,
append_to_response: str = "images,"
"credits,"
@ -893,3 +895,94 @@ class TmdbHelper:
except Exception as e:
print(str(e))
return None
def get_tv_season_detail(self, tmdbid, season: int):
"""
获取电视剧季的详情
:param tmdbid: TMDB ID
:param season: 季,数字
:return: TMDB信息
"""
"""
{
"_id": "5e614cd3357c00001631a6ef",
"air_date": "2023-01-15",
"episodes": [
{
"air_date": "2023-01-15",
"episode_number": 1,
"id": 2181581,
"name": "当你迷失在黑暗中",
"overview": "在一场全球性的流行病摧毁了文明之后,一个顽强的幸存者负责照顾一个 14 岁的小女孩,她可能是人类最后的希望。",
"production_code": "",
"runtime": 81,
"season_number": 1,
"show_id": 100088,
"still_path": "/aRquEWm8wWF1dfa9uZ1TXLvVrKD.jpg",
"vote_average": 8,
"vote_count": 33,
"crew": [
{
"job": "Writer",
"department": "Writing",
"credit_id": "619c370063536a00619a08ee",
"adult": false,
"gender": 2,
"id": 35796,
"known_for_department": "Writing",
"name": "Craig Mazin",
"original_name": "Craig Mazin",
"popularity": 15.211,
"profile_path": "/uEhna6qcMuyU5TP7irpTUZ2ZsZc.jpg"
},
],
"guest_stars": [
{
"character": "Marlene",
"credit_id": "63c4ca5e5f2b8d00aed539fc",
"order": 500,
"adult": false,
"gender": 1,
"id": 1253388,
"known_for_department": "Acting",
"name": "Merle Dandridge",
"original_name": "Merle Dandridge",
"popularity": 21.679,
"profile_path": "/lKwHdTtDf6NGw5dUrSXxbfkZLEk.jpg"
}
]
},
],
"name": "第 1 季",
"overview": "",
"id": 144593,
"poster_path": "/aUQKIpZZ31KWbpdHMCmaV76u78T.jpg",
"season_number": 1
}
"""
if not self.season:
return {}
try:
logger.info("正在查询TMDB电视剧%s,季:%s ..." % (tmdbid, season))
tmdbinfo = self.season.details(tmdbid, season)
return tmdbinfo or {}
except Exception as e:
print(str(e))
return {}
def get_tv_episode_detail(self, tmdbid: str, season: int, episode: int):
"""
获取电视剧集的详情
:param tmdbid: TMDB ID
:param season: 季,数字
:param episode: 集,数字
"""
if not self.episode:
return {}
try:
logger.info("正在查询TMDB集图片%s,季:%s,集:%s ..." % (tmdbid, season, episode))
tmdbinfo = self.episode.details(tmdbid, season, episode)
return tmdbinfo or {}
except Exception as e:
print(str(e))
return {}

View File

@ -3,6 +3,7 @@ import platform
import re
import shutil
from pathlib import Path
from typing import List
class SystemUtils:
@ -86,7 +87,7 @@ class SystemUtils:
return -1, str(err)
@staticmethod
def list_files_with_extensions(directory: Path, extensions: list) -> list:
def list_files_with_extensions(directory: Path, extensions: list) -> List[Path]:
files = []
pattern = r".*\.(" + "|".join(extensions) + ")$"