This commit is contained in:
jxxghp
2023-06-18 15:59:38 +08:00
parent 5dd7878e1b
commit 104ae59e18
19 changed files with 425 additions and 181 deletions

View File

@ -24,10 +24,31 @@ async def recognize(title: str,
return context.to_dict()
@router.get("/tmdb", response_model=schemas.MediaInfo)
@router.get("/search", response_model=List[schemas.MediaInfo])
async def search_by_title(title: str,
_: User = Depends(get_current_active_user)) -> Any:
"""
模糊搜索媒体信息列表
"""
_, medias = MediaChain().search(title=title)
return [media.to_dict() for media in medias]
@router.get("/doubanid", response_model=schemas.Context)
async def recognize_doubanid(doubanid: str,
_: User = Depends(get_current_active_user)) -> Any:
"""
根据豆瓣ID识别媒体信息
"""
# 识别媒体信息
context = MediaChain().recognize_by_doubanid(doubanid=doubanid)
return context.to_dict()
@router.get("/tmdbinfo", response_model=schemas.MediaInfo)
async def tmdb_info(tmdbid: int, type_name: str) -> Any:
"""
根据TMDBID查询媒体信息
根据TMDBID查询themoviedb媒体信息
"""
mtype = MediaType.MOVIE if type_name == MediaType.MOVIE.value else MediaType.TV
media = MediaChain().recognize_media(tmdbid=tmdbid, mtype=mtype)
@ -37,7 +58,7 @@ async def tmdb_info(tmdbid: int, type_name: str) -> Any:
return schemas.MediaInfo()
@router.get("/douban", response_model=schemas.MediaInfo)
@router.get("/doubaninfo", response_model=schemas.MediaInfo)
async def douban_info(doubanid: str) -> Any:
"""
根据豆瓣ID查询豆瓣媒体信息
@ -49,11 +70,59 @@ async def douban_info(doubanid: str) -> Any:
return schemas.MediaInfo()
@router.get("/search", response_model=List[schemas.MediaInfo])
async def search_by_title(title: str,
@router.get("/tmdbmovies", response_model=List[schemas.MediaInfo])
async def tmdb_movies(sort_by: str = "popularity.desc",
with_genres: str = "",
with_original_language: str = "",
page: int = 1,
_: User = Depends(get_current_active_user)) -> Any:
"""
搜索媒体信息
浏览TMDB电影信息
"""
_, medias = MediaChain().search(title=title)
return [media.to_dict() for media in medias]
movies = MediaChain().tmdb_movies(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [movie.to_dict() for movie in movies]
@router.get("/tmdbtvs", response_model=List[schemas.MediaInfo])
async def tmdb_tvs(sort_by: str = "popularity.desc",
with_genres: str = "",
with_original_language: str = "",
page: int = 1,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览TMDB剧集信息
"""
tvs = MediaChain().tmdb_tvs(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [tv.to_dict() for tv in tvs]
@router.get("/doubanmovies", response_model=List[schemas.MediaInfo])
async def douban_movies(sort: str = "R",
tags: str = "",
start: int = 0,
count: int = 30,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览豆瓣电影信息
"""
movies = MediaChain().douban_movies(sort=sort, tags=tags, start=start, count=count)
return [movie.to_dict() for movie in movies]
@router.get("/doubantvs", response_model=List[schemas.MediaInfo])
async def douban_tvs(sort: str = "R",
tags: str = "",
start: int = 0,
count: int = 30,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览豆瓣剧集信息
"""
tvs = MediaChain().douban_tvs(sort=sort, tags=tags, start=start, count=count)
return [tv.to_dict() for tv in tvs]

View File

@ -16,9 +16,19 @@ async def search_by_tmdbid(tmdbid: int,
mtype: str = None,
_: User = Depends(get_current_active_user)) -> Any:
"""
根据TMDBID搜索资源
根据TMDBID精确搜索站点资源
"""
if mtype:
mtype = MediaType.TV if mtype == MediaType.TV.value else MediaType.MOVIE
torrents = SearchChain().search_by_tmdbid(tmdbid=tmdbid, mtype=mtype)
return [torrent.to_dict() for torrent in torrents]
@router.get("/title", response_model=List[schemas.TorrentInfo])
async def search_by_title(title: str,
_: User = Depends(get_current_active_user)) -> Any:
"""
根据名称模糊搜索站点资源
"""
torrents = SearchChain().search_by_title(title=title)
return [torrent.to_dict() for torrent in torrents]

View File

@ -142,3 +142,15 @@ class ChainBase(AbstractSingleton, metaclass=Singleton):
def register_commands(self, commands: dict) -> None:
return self.__run_module("register_commands", commands=commands)
def douban_discover(self, mtype: MediaType, sort: str, tags: str,
start: int = 0, count: int = 30) -> Optional[List[dict]]:
return self.__run_module("douban_discover", mtype=mtype, sort=sort, tags=tags,
start=start, count=count)
def tmdb_discover(self, mtype: MediaType, sort_by: str, with_genres: str,
with_original_language: str, page: int = 1) -> Optional[List[dict]]:
return self.__run_module("tmdb_discover", mtype=mtype,
sort_by=sort_by, with_genres=with_genres,
with_original_language=with_original_language,
page=page)

View File

@ -16,7 +16,7 @@ from app.utils.http import RequestUtils
class CookieCloudChain(ChainBase):
"""
同步站点Cookie
CookieCloud处理链
"""
def __init__(self):

View File

@ -14,7 +14,7 @@ from app.log import logger
class DoubanChain(ChainBase):
"""
同步豆瓣想看数据
豆瓣处理链
"""
_interests_url: str = "https://www.douban.com/feed/people/%s/interests"

View File

@ -14,6 +14,9 @@ from app.utils.string import StringUtils
class DownloadChain(ChainBase):
"""
下载处理链
"""
def __init__(self):
super().__init__()
@ -84,8 +87,8 @@ class DownloadChain(ChainBase):
_folder_name = ""
if not torrent_file:
# 下载种子文件
_torrent_file, _folder_name, _ = self.download_torrent(_torrent, userid=userid)
if not _torrent_file:
torrent_file, _folder_name, _ = self.download_torrent(_torrent, userid=userid)
if not torrent_file:
return
# 添加下载
result: Optional[tuple] = self.download(torrent_path=torrent_file,

View File

@ -5,17 +5,18 @@ from app.core.context import Context, MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo
from app.log import logger
from app.schemas import MediaType
from app.utils.string import StringUtils
class MediaChain(ChainBase):
"""
识别处理链
媒体信息处理链
"""
def recognize_by_title(self, title: str, subtitle: str = None) -> Optional[Context]:
"""
识别媒体信息
根据主副标题识别媒体信息
"""
logger.info(f'开始识别媒体信息,标题:{title},副标题:{subtitle} ...')
# 识别前预处理
@ -35,6 +36,26 @@ class MediaChain(ChainBase):
# 返回上下文
return Context(meta=metainfo, mediainfo=mediainfo, title=title, subtitle=subtitle)
def recognize_by_doubanid(self, doubanid: str) -> Optional[Context]:
"""
根据豆瓣ID识别媒体信息
"""
logger.info(f'开始识别媒体信息豆瓣ID{doubanid} ...')
# 查询豆瓣信息
doubaninfo = self.douban_info(doubanid=doubanid)
if not doubaninfo:
logger.warn(f'未查询到豆瓣信息豆瓣ID{doubanid}')
return None
meta = MetaInfo(title=doubaninfo.get("original_title") or doubaninfo.get("title"))
# 识别媒体信息
mediainfo: MediaInfo = self.recognize_media(meta=meta)
if not mediainfo:
logger.warn(f'{meta.name} 未识别到TMDB媒体信息')
return Context(meta=meta, mediainfo=MediaInfo(douban_info=doubaninfo))
logger.info(f'{doubanid} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}{meta.season}')
mediainfo.set_douban_info(doubaninfo)
return Context(meta=meta, mediainfo=mediainfo)
def search(self, title: str) -> Tuple[MetaBase, List[MediaInfo]]:
"""
搜索媒体信息
@ -66,3 +87,57 @@ class MediaChain(ChainBase):
logger.info(f"{content} 搜索到 {len(medias)} 条相关媒体信息")
# 识别的元数据,媒体信息列表
return meta, medias
def douban_movies(self, sort: str, tags: str, start: int = 0, count: int = 30) -> List[MediaInfo]:
"""
浏览豆瓣电影列表
"""
logger.info(f'开始获取豆瓣电影列表,排序:{sort},标签:{tags}')
movies = self.douban_discover(mtype=MediaType.MOVIE, sort=sort, tags=tags, start=start, count=count)
if not movies:
logger.warn(f'豆瓣电影列表为空,排序:{sort},标签:{tags}')
return []
return [MediaInfo(douban_info=movie) for movie in movies]
def douban_tvs(self, sort: str, tags: str, start: int = 0, count: int = 30) -> List[MediaInfo]:
"""
浏览豆瓣剧集列表
"""
logger.info(f'开始获取豆瓣剧集列表,排序:{sort},标签:{tags}')
tvs = self.douban_discover(mtype=MediaType.TV, sort=sort, tags=tags, start=start, count=count)
if not tvs:
logger.warn(f'豆瓣剧集列表为空,排序:{sort},标签:{tags}')
return []
return [MediaInfo(douban_info=tv) for tv in tvs]
def tmdb_movies(self, sort_by: str, with_genres: str, with_original_language: str,
page: int = 1) -> List[MediaInfo]:
"""
浏览TMDB电影信息
"""
logger.info(f'开始获取TMDB电影列表排序{sort_by},类型:{with_genres},语言:{with_original_language}')
movies = self.tmdb_discover(mtype=MediaType.MOVIE,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
if not movies:
logger.warn(f'TMDB电影列表为空排序{sort_by},类型:{with_genres},语言:{with_original_language}')
return []
return [MediaInfo(tmdb_info=movie) for movie in movies]
def tmdb_tvs(self, sort_by: str, with_genres: str, with_original_language: str,
page: int = 1) -> List[MediaInfo]:
"""
浏览TMDB剧集信息
"""
logger.info(f'开始获取TMDB剧集列表排序{sort_by},类型:{with_genres},语言:{with_original_language}')
tvs = self.tmdb_discover(mtype=MediaType.TV,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
if not tvs:
logger.warn(f'TMDB剧集列表为空排序{sort_by},类型:{with_genres},语言:{with_original_language}')
return []
return [MediaInfo(tmdb_info=tv) for tv in tvs]

View File

@ -22,7 +22,7 @@ class SearchChain(ChainBase):
def search_by_tmdbid(self, tmdbid: int, mtype: str = None) -> Optional[List[Context]]:
"""
根据TMDB ID搜索资源不过滤本地存在的内容
根据TMDB ID搜索资源精确匹配,但不不过滤本地存在的资源
:param tmdbid: TMDB ID
:param mtype: 媒体,电影 or 电视剧
"""
@ -32,11 +32,30 @@ class SearchChain(ChainBase):
return None
return self.process(mediainfo=mediainfo)
def search_by_title(self, title: str, site_ids: List[int] = None) -> List[TorrentInfo]:
"""
根据标题搜索资源,不识别不过滤,直接返回站点内容
"""
logger.info(f'开始搜索资源,关键词:{title} ...')
# 未开启的站点不搜索
indexer_sites = []
for indexer in self.siteshelper.get_indexers():
if not settings.INDEXER_SITES \
or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]):
if site_ids and indexer.get("id") not in site_ids:
continue
indexer_sites.append(indexer)
if not indexer_sites:
logger.warn('未开启任何有效站点,无法搜索资源')
return []
# 搜索
return self.search_torrents(mediainfo=None, sites=indexer_sites, keyword=title)
def process(self, mediainfo: MediaInfo,
keyword: str = None,
no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None) -> Optional[List[Context]]:
"""
根据媒体信息搜索种子资源
根据媒体信息搜索种子资源精确匹配应用过滤规则同时根据no_exists过滤本地已存在的资源
:param mediainfo: 媒体信息
:param keyword: 搜索关键词
:param no_exists: 缺失的媒体信息

View File

@ -10,7 +10,7 @@ from app.log import logger
class SiteChain(ChainBase):
"""
站点远程管理处理链
站点管理处理链
"""
_siteoper: SiteOper = None

View File

@ -17,7 +17,7 @@ from app.schemas.types import MediaType
class SubscribeChain(ChainBase):
"""
订阅处理链
订阅管理处理链
"""
# 站点最新种子缓存 {站点域名: 种子上下文}

View File

@ -8,7 +8,7 @@ from app.schemas.types import EventType
class WebhookChain(ChainBase):
"""
响应Webhook事件
Webhook处理链
"""
def message(self, body: Any, form: Any, args: Any) -> None:

View File

@ -89,6 +89,18 @@ class TorrentInfo:
"""
return self.get_free_string(self.uploadvolumefactor, self.downloadvolumefactor)
def to_dict(self):
"""
返回字典
"""
attributes = [
attr for attr in dir(self)
if not callable(getattr(self, attr)) and not attr.startswith("_")
]
return {
attr: getattr(self, attr) for attr in attributes
}
class MediaInfo:
# 类型 电影、电视剧
@ -123,6 +135,8 @@ class MediaInfo:
names: Optional[list] = []
# 各季的剧集清单信息
seasons: Optional[Dict[int, list]] = {}
# 各季详情
season_info: List[dict] = []
# 各季的年份
season_years: Optional[dict] = {}
# 二级分类
@ -233,7 +247,10 @@ class MediaInfo:
# 本体
self.tmdb_info = info
# 类型
if isinstance(info.get('media_type'), MediaType):
self.type = info.get('media_type')
else:
self.type = MediaType.MOVIE if info.get("media_type") == "movie" else MediaType.TV
# TMDBID
self.tmdb_id = info.get('id')
if not self.tmdb_id:
@ -270,16 +287,17 @@ class MediaInfo:
self.year = self.release_date[:4]
# 季集信息
if info.get('seasons'):
for season_info in info.get('seasons'):
self.season_info = info.get('seasons')
for seainfo in info.get('seasons'):
# 季
season = season_info.get("season_number")
season = seainfo.get("season_number")
if not season:
continue
# 集
episode_count = season_info.get("episode_count")
episode_count = seainfo.get("episode_count")
self.seasons[season] = list(range(1, episode_count + 1))
# 年份
air_date = season_info.get("air_date")
air_date = seainfo.get("air_date")
if air_date:
self.season_years[season] = air_date[:4]
# 海报
@ -292,6 +310,10 @@ class MediaInfo:
self.directors, self.actors = __directors_actors(info)
# 别名和译名
self.names = info.get('names') or []
# 剩余属性赋值
for key, value in info.items():
if not hasattr(self.__class__, key):
setattr(self, key, value)
def set_douban_info(self, info: dict):
"""
@ -304,7 +326,11 @@ class MediaInfo:
# 豆瓣ID
self.douban_id = str(info.get("id"))
# 类型
if not self.type:
if isinstance(info.get('media_type'), MediaType):
self.type = info.get('media_type')
else:
self.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV
# 标题
if not self.title:
@ -358,6 +384,10 @@ class MediaInfo:
episodes_count = info.get("episodes_count")
if episodes_count:
self.seasons[meta.begin_season] = list(range(1, episodes_count + 1))
# 剩余属性赋值
for key, value in info.items():
if not hasattr(self.__class__, key):
setattr(self, key, value)
@property
def title_year(self):
@ -457,16 +487,6 @@ class Context:
"""
上下文对象
"""
# 识别前的信息
title: Optional[str] = None
subtitle: Optional[str] = None
# 用户信息
userid: Optional[str] = None
username: Optional[str] = None
# 操作类型
action: Optional[str] = None
# 识别信息
_meta_info: Optional[MetaBase] = None

View File

@ -68,6 +68,31 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def douban_discover(self, mtype: MediaType, sort: str, tags: str,
start: int = 0, count: int = 30) -> Optional[List[dict]]:
"""
发现豆瓣电影、剧集
:param mtype: 媒体类型
:param sort: 排序方式
:param tags: 标签
:param start: 起始位置
:param count: 数量
:return: 媒体信息列表
"""
pass
def tmdb_discover(self, mtype: MediaType, sort_by: str, with_genres: str, with_original_language: str,
page: int = 1) -> Optional[List[dict]]:
"""
:param mtype: 媒体类型
:param sort_by: 排序方式
:param with_genres: 类型
:param with_original_language: 语言
:param page: 页码
:return: 媒体信息列表
"""
pass
def tvdb_info(self, tvdbid: int) -> Optional[dict]:
"""
获取TVDB信息

View File

@ -50,44 +50,29 @@ class DoubanModule(_ModuleBase):
if douban_info and celebrities:
douban_info["directors"] = celebrities.get("directors")
douban_info["actors"] = celebrities.get("actors")
return self.__extend_doubaninfo(douban_info)
return douban_info
@staticmethod
def __extend_doubaninfo(doubaninfo: dict):
def douban_discover(self, mtype: MediaType, sort: str, tags: str,
start: int = 0, count: int = 30) -> Optional[List[dict]]:
"""
补充添加豆瓣信息
发现豆瓣电影、剧集
:param mtype: 媒体类型
:param sort: 排序方式
:param tags: 标签
:param start: 起始位置
:param count: 数量
:return: 媒体信息列表
"""
# 类型
if doubaninfo.get("type") == "movie":
doubaninfo['media_type'] = MediaType.MOVIE
elif doubaninfo.get("type") == "tv":
doubaninfo['media_type'] = MediaType.TV
logger.info(f"开始发现豆瓣 {mtype.value} ...")
if mtype == MediaType.MOVIE:
infos = self.doubanapi.movie_recommend(start=start, count=count,
sort=sort, tags=tags)
else:
return doubaninfo
# 评分
rating = doubaninfo.get('rating')
if rating:
doubaninfo['vote_average'] = float(rating.get("value"))
else:
doubaninfo['vote_average'] = 0
# 海报
if doubaninfo.get("type") == "movie":
poster_path = doubaninfo.get('cover', {}).get("url")
if not poster_path:
poster_path = doubaninfo.get('cover_url')
if not poster_path:
poster_path = doubaninfo.get('pic', {}).get("large")
else:
poster_path = doubaninfo.get('pic', {}).get("normal")
if poster_path:
poster_path = poster_path.replace("s_ratio_poster", "m_ratio_poster")
doubaninfo['poster_path'] = poster_path
# 简介
doubaninfo['overview'] = doubaninfo.get("card_subtitle") or ""
return doubaninfo
infos = self.doubanapi.tv_recommend(start=start, count=count,
sort=sort, tags=tags)
if not infos:
return []
return infos.get("items") or []
def search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]:
"""
@ -156,67 +141,66 @@ class DoubanModule(_ModuleBase):
if not doubaninfo:
logger.warn(f"未找到 {mediainfo.title} 的豆瓣信息")
break
doubaninfo = self.__extend_doubaninfo(doubaninfo)
# 刮削
self.gen_scraper_files(meta, doubaninfo, file)
self.gen_scraper_files(meta, MediaInfo(douban_info=doubaninfo), file)
except Exception as e:
logger.error(f"刮削文件 {file} 失败,原因:{e}")
logger.info(f"{file} 刮削完成")
def gen_scraper_files(self, meta: MetaBase, doubaninfo: dict, file_path: Path):
def gen_scraper_files(self, meta: MetaBase, mediainfo: MediaInfo, file_path: Path):
"""
生成刮削文件
:param meta: 元数据
:param doubaninfo: 豆瓣信息
:param mediainfo: 媒体信息
:param file_path: 文件路径
"""
try:
# 电影
if meta.type == MediaType.MOVIE:
if mediainfo.type == MediaType.MOVIE:
# 强制或者不已存在时才处理
if not file_path.with_name("movie.nfo").exists() \
and not file_path.with_suffix(".nfo").exists():
# 生成电影描述文件
self.__gen_movie_nfo_file(doubaninfo=doubaninfo,
self.__gen_movie_nfo_file(mediainfo=mediainfo,
file_path=file_path)
# 生成电影图片
self.__save_image(url=doubaninfo.get('poster_path'),
file_path=file_path.with_name(f"poster{Path(doubaninfo.get('poster_path')).suffix}"))
self.__save_image(url=mediainfo.poster_path,
file_path=file_path.with_name(f"poster{Path(mediainfo.poster_path).suffix}"))
# 电视剧
else:
# 不存在时才处理
if not file_path.parent.with_name("tvshow.nfo").exists():
# 根目录描述文件
self.__gen_tv_nfo_file(doubaninfo=doubaninfo,
self.__gen_tv_nfo_file(mediainfo=mediainfo,
dir_path=file_path.parents[1])
# 生成根目录图片
self.__save_image(url=doubaninfo.get('poster_path'),
file_path=file_path.with_name(f"poster{Path(doubaninfo.get('poster_path')).suffix}"))
self.__save_image(url=mediainfo.poster_path,
file_path=file_path.with_name(f"poster{Path(mediainfo.poster_path).suffix}"))
# 季目录NFO
if not file_path.with_name("season.nfo").exists():
self.__gen_tv_season_nfo_file(seasoninfo=doubaninfo,
self.__gen_tv_season_nfo_file(mediainfo=mediainfo,
season=meta.begin_season,
season_path=file_path.parent)
except Exception as e:
logger.error(f"{file_path} 刮削失败:{e}")
@staticmethod
def __gen_common_nfo(doubaninfo: dict, doc, root):
def __gen_common_nfo(mediainfo: MediaInfo, doc, root):
# 添加时间
DomUtils.add_node(doc, root, "dateadded",
time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
# 简介
xplot = DomUtils.add_node(doc, root, "plot")
xplot.appendChild(doc.createCDATASection(doubaninfo.get('overview') or ""))
xplot.appendChild(doc.createCDATASection(mediainfo.overview or ""))
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(doubaninfo.get('.overview') or ""))
xoutline.appendChild(doc.createCDATASection(mediainfo.overview or ""))
# 导演
for director in doubaninfo.get('directors'):
for director in mediainfo.directors:
DomUtils.add_node(doc, root, "director", director.get("name") or "")
# 演员
for actor in doubaninfo.get('actors'):
for actor in mediainfo.actors:
xactor = DomUtils.add_node(doc, root, "actor")
DomUtils.add_node(doc, xactor, "name", actor.get("name") or "")
DomUtils.add_node(doc, xactor, "type", "Actor")
@ -224,16 +208,16 @@ class DoubanModule(_ModuleBase):
DomUtils.add_node(doc, xactor, "thumb", actor.get('avatar', {}).get('normal'))
DomUtils.add_node(doc, xactor, "profile", actor.get('url'))
# 评分
DomUtils.add_node(doc, root, "rating", doubaninfo.get('vote_average') or "0")
DomUtils.add_node(doc, root, "rating", mediainfo.vote_average or "0")
return doc
def __gen_movie_nfo_file(self,
doubaninfo: dict,
mediainfo: MediaInfo,
file_path: Path):
"""
生成电影的NFO描述文件
:param doubaninfo: 豆瓣信息
:param mediainfo: 豆瓣信息
:param file_path: 电影文件路径
"""
# 开始生成XML
@ -241,22 +225,22 @@ class DoubanModule(_ModuleBase):
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "movie")
# 公共部分
doc = self.__gen_common_nfo(doubaninfo=doubaninfo,
doc = self.__gen_common_nfo(mediainfo=mediainfo,
doc=doc,
root=root)
# 标题
DomUtils.add_node(doc, root, "title", doubaninfo.get('title') or "")
DomUtils.add_node(doc, root, "title", mediainfo.title or "")
# 年份
DomUtils.add_node(doc, root, "year", doubaninfo.get('year') or "")
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
# 保存
self.__save_nfo(doc, file_path.with_suffix(".nfo"))
def __gen_tv_nfo_file(self,
doubaninfo: dict,
mediainfo: MediaInfo,
dir_path: Path):
"""
生成电视剧的NFO描述文件
:param doubaninfo: 媒体信息
:param mediainfo: 媒体信息
:param dir_path: 电视剧根目录
"""
# 开始生成XML
@ -264,22 +248,22 @@ class DoubanModule(_ModuleBase):
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "tvshow")
# 公共部分
doc = self.__gen_common_nfo(doubaninfo=doubaninfo,
doc = self.__gen_common_nfo(mediainfo=mediainfo,
doc=doc,
root=root)
# 标题
DomUtils.add_node(doc, root, "title", doubaninfo.get('title') or "")
DomUtils.add_node(doc, root, "title", mediainfo.title or "")
# 年份
DomUtils.add_node(doc, root, "year", doubaninfo.get('year') or "")
DomUtils.add_node(doc, root, "year", mediainfo.year or "")
DomUtils.add_node(doc, root, "season", "-1")
DomUtils.add_node(doc, root, "episode", "-1")
# 保存
self.__save_nfo(doc, dir_path.joinpath("tvshow.nfo"))
def __gen_tv_season_nfo_file(self, seasoninfo: dict, season: int, season_path: Path):
def __gen_tv_season_nfo_file(self, mediainfo: MediaInfo, season: int, season_path: Path):
"""
生成电视剧季的NFO描述文件
:param seasoninfo: TMDB季媒体信息
:param mediainfo: 媒体信息
:param season: 季号
:param season_path: 电视剧季的目录
"""
@ -290,84 +274,21 @@ class DoubanModule(_ModuleBase):
DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
# 简介
xplot = DomUtils.add_node(doc, root, "plot")
xplot.appendChild(doc.createCDATASection(seasoninfo.get("overview") or ""))
xplot.appendChild(doc.createCDATASection(mediainfo.overview or ""))
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(seasoninfo.get("overview") or ""))
xoutline.appendChild(doc.createCDATASection(mediainfo.overview or ""))
# 标题
DomUtils.add_node(doc, root, "title", "%s" % season)
# 发行日期
DomUtils.add_node(doc, root, "premiered", seasoninfo.get("air_date") or "")
DomUtils.add_node(doc, root, "releasedate", seasoninfo.get("air_date") or "")
DomUtils.add_node(doc, root, "premiered", mediainfo.release_date or "")
DomUtils.add_node(doc, root, "releasedate", mediainfo.release_date or "")
# 发行年份
DomUtils.add_node(doc, root, "year", seasoninfo.get("air_date")[:4] if seasoninfo.get("air_date") else "")
DomUtils.add_node(doc, root, "year", mediainfo.release_date[:4] if mediainfo.release_date else "")
# seasonnumber
DomUtils.add_node(doc, root, "seasonnumber", str(season))
# 保存
self.__save_nfo(doc, season_path.joinpath("season.nfo"))
def __gen_tv_episode_nfo_file(self,
episodeinfo: dict,
season: int,
episode: int,
file_path: Path,
force_nfo: bool = False):
"""
生成电视剧集的NFO描述文件
:param episodeinfo: 集TMDB元数据
:param season: 季号
:param episode: 集号
:param file_path: 集文件的路径
:param force_nfo: 是否强制生成NFO文件
"""
if not force_nfo and file_path.with_suffix(".nfo").exists():
return
# 开始生成集的信息
logger.info(f"正在生成剧集NFO文件{file_path.name}")
doc = minidom.Document()
root = DomUtils.add_node(doc, doc, "episodedetails")
# 添加时间
DomUtils.add_node(doc, root, "dateadded", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
# TMDBID
uniqueid = DomUtils.add_node(doc, root, "uniqueid", episodeinfo.get("id") or "")
uniqueid.setAttribute("type", "tmdb")
uniqueid.setAttribute("default", "true")
# tmdbid
DomUtils.add_node(doc, root, "tmdbid", episodeinfo.get("id") or "")
# 标题
DomUtils.add_node(doc, root, "title", episodeinfo.get("name") or "%s" % episode)
# 简介
xplot = DomUtils.add_node(doc, root, "plot")
xplot.appendChild(doc.createCDATASection(episodeinfo.get("overview") or ""))
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(episodeinfo.get("overview") or ""))
# 发布日期
DomUtils.add_node(doc, root, "aired", episodeinfo.get("air_date") or "")
# 年份
DomUtils.add_node(doc, root, "year",
episodeinfo.get("air_date")[:4] if episodeinfo.get("air_date") else "")
# 季
DomUtils.add_node(doc, root, "season", str(season))
# 集
DomUtils.add_node(doc, root, "episode", str(episode))
# 评分
DomUtils.add_node(doc, root, "rating", episodeinfo.get("vote_average") or "0")
# 导演
directors = episodeinfo.get("crew") or []
for director in directors:
if director.get("known_for_department") == "Directing":
xdirector = DomUtils.add_node(doc, root, "director", director.get("name") or "")
xdirector.setAttribute("tmdbid", str(director.get("id") or ""))
# 演员
actors = episodeinfo.get("guest_stars") or []
for actor in actors:
if actor.get("known_for_department") == "Acting":
xactor = DomUtils.add_node(doc, root, "actor")
DomUtils.add_node(doc, xactor, "name", actor.get("name") or "")
DomUtils.add_node(doc, xactor, "type", "Actor")
DomUtils.add_node(doc, xactor, "tmdbid", actor.get("id") or "")
# 保存文件
self.__save_nfo(doc, file_path.with_suffix(".nfo"))
@staticmethod
def __save_image(url: str, file_path: Path):
"""

View File

@ -32,7 +32,7 @@ class QbittorrentModule(_ModuleBase):
:param episodes: 需要下载的集数
:return: 种子Hash错误信息
"""
if not torrent_path.exists():
if not torrent_path or not torrent_path.exists():
return None, f"种子文件不存在:{torrent_path}"
# 生成随机Tag
tag = StringUtils.generate_random_str(10)

View File

@ -192,6 +192,29 @@ class TheMovieDbModule(_ModuleBase):
file_path=file)
logger.info(f"{file} 刮削完成")
def tmdb_discover(self, mtype: MediaType, sort_by: str, with_genres: str, with_original_language: str,
page: int = 1) -> Optional[List[dict]]:
"""
:param mtype: 媒体类型
:param sort_by: 排序方式
:param with_genres: 类型
:param with_original_language: 语言
:param page: 页码
:return: 媒体信息列表
"""
if mtype == MediaType.MOVIE:
return self.tmdb.discover_movies(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
elif mtype == MediaType.TV:
return self.tmdb.discover_tvs(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
else:
return None
def gen_scraper_files(self, mediainfo: MediaInfo, file_path: Path):
"""
生成刮削文件

View File

@ -5,7 +5,7 @@ from urllib.parse import quote
import zhconv
from lxml import etree
from tmdbv3api import TMDb, Search, Movie, TV, Season, Episode
from tmdbv3api import TMDb, Search, Movie, TV, Season, Episode, Discover
from tmdbv3api.exceptions import TMDbException
from app.core.config import settings
@ -48,6 +48,7 @@ class TmdbHelper:
self.tv = TV()
self.season = Season()
self.episode = Episode()
self.discover = Discover()
def search_multiis(self, title: str) -> List[dict]:
"""
@ -979,3 +980,41 @@ class TmdbHelper:
except Exception as e:
print(str(e))
return {}
def discover_movies(self, **kwargs):
"""
发现电影
:param kwargs:
:return:
"""
if not self.discover:
return []
try:
logger.info(f"正在发现电影:{kwargs}...")
tmdbinfo = self.discover.discover_movies(kwargs)
if tmdbinfo:
for info in tmdbinfo:
info['media_type'] = MediaType.MOVIE
return tmdbinfo or []
except Exception as e:
print(str(e))
return []
def discover_tvs(self, **kwargs):
"""
发现电视剧
:param kwargs:
:return:
"""
if not self.discover:
return []
try:
logger.info(f"正在发现电视剧:{kwargs}...")
tmdbinfo = self.discover.discover_tv_shows(kwargs)
if tmdbinfo:
for info in tmdbinfo:
info['media_type'] = MediaType.TV
return tmdbinfo or []
except Exception as e:
print(str(e))
return []

View File

@ -1,8 +1,7 @@
from .token import Token, TokenPayload
from .user import User, UserCreate, UserInDB, UserUpdate
from .response import Response
from .site import Site
from .subscribe import Subscribe
from .context import Context, MediaInfo, MetaInfo, TransferTorrent, DownloadingTorrent, TransferInfo, ExistMediaInfo, \
NotExistMediaInfo, RefreshMediaItem
from .servarr import RadarrMovie, SonarrSeries
from .token import *
from .user import *
from .response import *
from .site import *
from .subscribe import *
from .context import *
from .servarr import *

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import Optional, Dict
from typing import Optional, Dict, List
from pydantic import BaseModel
@ -80,10 +80,39 @@ class MediaInfo(BaseModel):
overview: Optional[str] = None
# 二级分类
category: str = ""
# 季
# 季季集清单
seasons: Dict[int, list] = {}
# 季详情
season_info: List[dict] = []
# 别名和译名
names: list = []
# 演员
actors: list = []
# 导演
directors: list = []
# 其它TMDB属性
adult: bool = False
created_by: list = []
episode_run_time: list = []
genres: list = []
first_air_date: Optional[str] = None
homepage: Optional[str] = None
languages: list = []
last_air_date: Optional[str] = None
networks: list = []
number_of_episodes: int = 0
number_of_seasons: int = 0
origin_country: list = []
original_name: Optional[str] = None
production_companies: list = []
production_countries: list = []
spoken_languages: list = []
status: Optional[str] = None
tagline: Optional[str] = None
vote_count: int = 0
popularity: int = 0
runtime: Optional[int] = None
next_episode_to_air: Optional[str] = None
class TorrentInfo(BaseModel):