feat:媒体信息聚合

This commit is contained in:
jxxghp 2024-04-27 15:20:46 +08:00
parent 26b5ad6a44
commit 7a37078e90
19 changed files with 274 additions and 105 deletions

View File

@ -24,7 +24,7 @@ def calendar(page: int = 1,
return [media.to_dict() for media in medias]
@router.get("/credits/{bangumiid}", summary="查询Bangumi演职员表", response_model=List[schemas.BangumiPerson])
@router.get("/credits/{bangumiid}", summary="查询Bangumi演职员表", response_model=List[schemas.MediaPerson])
def bangumi_credits(bangumiid: int,
page: int = 1,
count: int = 20,
@ -35,7 +35,7 @@ def bangumi_credits(bangumiid: int,
persons = BangumiChain().bangumi_credits(bangumiid, page=page, count=count)
if not persons:
return []
return [schemas.BangumiPerson(**person) for person in persons]
return [schemas.MediaPerson(source='bangumi', **person) for person in persons]
@router.get("/recommend/{bangumiid}", summary="查询Bangumi推荐", response_model=List[schemas.MediaInfo])
@ -51,6 +51,40 @@ def bangumi_recommend(bangumiid: int,
return [media.to_dict() for media in medias]
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson)
def bangumi_person(person_id: int,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据人物ID查询人物详情
"""
personinfo = BangumiChain().person_detail(person_id=person_id)
if not personinfo:
return schemas.MediaPerson(source='bangumi')
else:
return schemas.MediaPerson(source='bangumi', **{
"id": personinfo.get("id"),
"name": personinfo.get("name"),
"images": personinfo.get("images"),
"biography": personinfo.get("summary"),
"birthday": personinfo.get("birth_day"),
"gender": personinfo.get("gender")
})
@router.get("/person/credits/{person_id}", summary="人物参演作品", response_model=List[schemas.MediaInfo])
def bangumi_person_credits(person_id: int,
page: int = 1,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据人物ID查询人物参演作品
"""
infos = BangumiChain().person_credits(person_id=person_id, page=page)
if not infos:
return []
else:
return [MediaInfo(bangumi_info=info).to_dict() for info in infos]
@router.get("/{bangumiid}", summary="查询Bangumi详情", response_model=schemas.MediaInfo)
def bangumi_info(bangumiid: int,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:

View File

@ -28,6 +28,43 @@ def douban_img(imgurl: str) -> Any:
return None
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson)
def douban_person(person_id: int,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据人物ID查询人物详情
"""
personinfo = DoubanChain().person_detail(person_id=person_id)
if not personinfo:
return schemas.MediaPerson(source='douban')
else:
also_known_as = []
infos = personinfo.get("extra", {}).get("info")
if infos:
also_known_as = ["".join(info) for info in infos]
return schemas.MediaPerson(source='douban', **{
"id": personinfo.get("id"),
"name": personinfo.get("title"),
"avatar": personinfo.get("cover_img", {}).get("url"),
"biography": personinfo.get("extra", {}).get("short_info"),
"also_known_as": also_known_as,
})
@router.get("/person/credits/{person_id}", summary="人物参演作品", response_model=List[schemas.MediaInfo])
def douban_person_credits(person_id: int,
page: int = 1,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据人物ID查询人物参演作品
"""
works = DoubanChain().person_credits(person_id=person_id, page=page)
if not works:
return []
else:
return [MediaInfo(douban_info=work.get("subject")).to_dict() for work in works]
@router.get("/showing", summary="豆瓣正在热映", response_model=List[schemas.MediaInfo])
def movie_showing(page: int = 1,
count: int = 30,
@ -149,13 +186,13 @@ def tv_hot(page: int = 1,
return [MediaInfo(douban_info=tv).to_dict() for tv in tvs]
@router.get("/credits/{doubanid}/{type_name}", summary="豆瓣演员阵容", response_model=List[schemas.DoubanPerson])
@router.get("/credits/{doubanid}/{type_name}", summary="豆瓣演员阵容", response_model=List[schemas.MediaPerson])
def douban_credits(doubanid: str,
type_name: str,
page: int = 1,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据TMDBID查询演员阵容type_name: 电影/电视剧
根据豆瓣ID查询演员阵容type_name: 电影/电视剧
"""
mediatype = MediaType(type_name)
if mediatype == MediaType.MOVIE:
@ -167,7 +204,10 @@ def douban_credits(doubanid: str,
if not doubaninfos:
return []
else:
return [schemas.DoubanPerson(**doubaninfo) for doubaninfo in doubaninfos]
# 更新豆瓣演员信息中的ID从URI中提取'douban://douban.com/celebrity/1316132?subject_id=27503705' subject_id
for doubaninfo in doubaninfos:
doubaninfo['id'] = doubaninfo.get('uri', '').split('?subject_id=')[-1]
return [schemas.MediaPerson(source='douban', **doubaninfo) for doubaninfo in doubaninfos]
@router.get("/recommend/{doubanid}/{type_name}", summary="豆瓣推荐电影/电视剧", response_model=List[schemas.MediaInfo])

View File

@ -61,25 +61,6 @@ def exists(title: str = None,
ret_info = {
"id": exist.item_id
}
"""
else:
# 服务器是否存在
mediainfo = MediaInfo()
mediainfo.from_dict({
"title": meta.name,
"year": year or meta.year,
"type": mtype or meta.type,
"tmdb_id": tmdbid,
"season": season
})
exist: schemas.ExistMediaInfo = MediaServerChain().media_exists(
mediainfo=mediainfo
)
if exist:
ret_info = {
"id": exist.itemid
}
"""
return schemas.Response(success=True if exist else False, data={
"item": ret_info
})

View File

@ -63,7 +63,7 @@ def tmdb_recommend(tmdbid: int,
return [MediaInfo(tmdb_info=tmdbinfo).to_dict() for tmdbinfo in tmdbinfos]
@router.get("/credits/{tmdbid}/{type_name}", summary="演员阵容", response_model=List[schemas.TmdbPerson])
@router.get("/credits/{tmdbid}/{type_name}", summary="演员阵容", response_model=List[schemas.MediaPerson])
def tmdb_credits(tmdbid: int,
type_name: str,
page: int = 1,
@ -81,10 +81,10 @@ def tmdb_credits(tmdbid: int,
if not tmdbinfos:
return []
else:
return [schemas.TmdbPerson(**tmdbinfo) for tmdbinfo in tmdbinfos]
return [schemas.MediaPerson(source='themoviedb', **tmdbinfo) for tmdbinfo in tmdbinfos]
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.TmdbPerson)
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson)
def tmdb_person(person_id: int,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
@ -92,9 +92,9 @@ def tmdb_person(person_id: int,
"""
tmdbinfo = TmdbChain().person_detail(person_id=person_id)
if not tmdbinfo:
return schemas.TmdbPerson()
return schemas.MediaPerson(source='themoviedb')
else:
return schemas.TmdbPerson(**tmdbinfo)
return schemas.MediaPerson(source='themoviedb', **tmdbinfo)
@router.get("/person/credits/{person_id}", summary="人物参演作品", response_model=List[schemas.MediaInfo])

View File

@ -19,7 +19,7 @@ from app.db.message_oper import MessageOper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
WebhookEventInfo, TmdbEpisode, TmdbPerson
WebhookEventInfo, TmdbEpisode, MediaPerson
from app.schemas.types import TorrentStatus, MediaType, MediaImageType, EventType
from app.utils.object import ObjectUtils
@ -260,7 +260,7 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("search_medias", meta=meta)
def search_persons(self, name: str) -> Optional[List[TmdbPerson]]:
def search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
搜索人物信息
:param name: 人物名称

View File

@ -40,3 +40,18 @@ class BangumiChain(ChainBase, metaclass=Singleton):
:param bangumiid: BangumiID
"""
return self.run_module("bangumi_recommend", bangumiid=bangumiid)
def person_detail(self, person_id: int) -> dict:
"""
根据人物ID查询Bangumi人物详情
:param person_id: 人物ID
"""
return self.run_module("bangumi_person_detail", person_id=person_id)
def person_credits(self, person_id: int, page: int = 1) -> List[dict]:
"""
根据人物ID查询人物参演作品
:param person_id: 人物ID
:param page: 页码
"""
return self.run_module("bangumi_person_credits", person_id=person_id, page=page)

View File

@ -11,6 +11,21 @@ class DoubanChain(ChainBase, metaclass=Singleton):
豆瓣处理链单例运行
"""
def person_detail(self, person_id: int) -> dict:
"""
根据人物ID查询豆瓣人物详情
:param person_id: 人物ID
"""
return self.run_module("douban_person_detail", person_id=person_id)
def person_credits(self, person_id: int, page: int = 1) -> List[dict]:
"""
根据人物ID查询人物参演作品
:param person_id: 人物ID
:param page: 页码
"""
return self.run_module("douban_person_credits", person_id=person_id, page=page)
def movie_top250(self, page: int = 1, count: int = 30) -> Optional[List[dict]]:
"""
获取豆瓣电影TOP250

View File

@ -10,7 +10,7 @@ from app.core.event import eventmanager, Event
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo, MetaInfoPath
from app.log import logger
from app.schemas import TmdbPerson
from app.schemas import MediaPerson
from app.schemas.types import EventType, MediaType
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
@ -158,7 +158,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
return Context(meta_info=file_meta, media_info=mediainfo)
def search(self, title: str,
stype: str = "media") -> Tuple[Optional[MetaBase], List[Union[MediaInfo, TmdbPerson]]]:
stype: str = "media") -> Tuple[Optional[MetaBase], List[Union[MediaInfo, MediaPerson]]]:
"""
搜索媒体/人物信息
:param title: 搜索内容

View File

@ -133,6 +133,8 @@ class TorrentInfo:
@dataclass
class MediaInfo:
# 来源themoviedb、douban、bangumi
source: str = None
# 类型 电影、电视剧
type: MediaType = None
# 媒体标题
@ -355,6 +357,8 @@ class MediaInfo:
if not info:
return
# 来源
self.source = "themoviedb"
# 本体
self.tmdb_info = info
# 类型
@ -440,6 +444,8 @@ class MediaInfo:
"""
if not info:
return
# 来源
self.source = "douban"
# 本体
self.douban_info = info
# 豆瓣ID
@ -448,6 +454,8 @@ class MediaInfo:
if not self.type:
if isinstance(info.get('media_type'), MediaType):
self.type = info.get('media_type')
elif info.get("subtype"):
self.type = MediaType.MOVIE if info.get("subtype") == "movie" else MediaType.TV
elif info.get("type"):
self.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV
elif info.get("type_name"):
@ -464,6 +472,8 @@ class MediaInfo:
# 年份
if not self.year:
self.year = info.get("year")[:4] if info.get("year") else None
if not self.year and info.get("extra"):
self.year = info.get("extra").get("year")
# 识别标题中的季
meta = MetaInfo(info.get("title"))
# 季
@ -498,10 +508,18 @@ class MediaInfo:
if not self.poster_path and info.get("cover_url"):
self.poster_path = info.get("cover_url")
if not self.poster_path and info.get("cover"):
self.poster_path = info.get("cover").get("url")
if info.get("cover").get("url"):
self.poster_path = info.get("cover").get("url")
else:
self.poster_path = info.get("cover").get("large", {}).get("url")
# 简介
if not self.overview:
self.overview = info.get("intro") or info.get("card_subtitle") or ""
if not self.overview:
if info.get("extra", {}).get("info"):
extra_info = info.get("extra").get("info")
if extra_info:
self.overview = "".join(["".join(item) for item in extra_info])
# 从简介中提取年份
if self.overview and not self.year:
match = re.search(r'\d{4}', self.overview)
@ -553,6 +571,8 @@ class MediaInfo:
"""
if not info:
return
# 来源
self.source = "bangumi"
# 本体
self.bangumi_info = info
# 豆瓣ID
@ -589,6 +609,8 @@ class MediaInfo:
if not self.poster_path:
if info.get("images"):
self.poster_path = info.get("images", {}).get("large")
if not self.poster_path and info.get("image"):
self.poster_path = info.get("image")
# 简介
if not self.overview:
self.overview = info.get("summary")

View File

@ -79,7 +79,7 @@ class BangumiModule(_ModuleBase):
:param page: 页码
:param count: 数量
"""
persons = self.bangumiapi.persons(bangumiid) or []
persons = self.bangumiapi.credits(bangumiid)
if persons:
return persons[(page - 1) * count: page * count]
else:
@ -90,4 +90,19 @@ class BangumiModule(_ModuleBase):
根据BangumiID查询推荐电影
:param bangumiid: BangumiID
"""
return self.bangumiapi.subjects(bangumiid) or []
return self.bangumiapi.subjects(bangumiid)
def bangumi_person_detail(self, person_id: int) -> dict:
"""
获取人物详细信息
:param person_id: 豆瓣人物ID
"""
return self.bangumiapi.person_detail(person_id)
def bangumi_person_credits(self, person_id: int, page: int = 1) -> List[dict]:
"""
根据TMDBID查询人物参演作品
:param person_id: 人物ID
:param page: 页码
"""
return self.bangumiapi.person_credits(person_id=person_id, page=page)

View File

@ -14,9 +14,11 @@ class BangumiApi(object):
_urls = {
"calendar": "calendar",
"detail": "v0/subjects/%s",
"persons": "v0/subjects/%s/persons",
"credits": "v0/subjects/%s/persons",
"subjects": "v0/subjects/%s/subjects",
"characters": "v0/subjects/%s/characters"
"characters": "v0/subjects/%s/characters",
"person_detail": "v0/persons/%s",
"person_credits": "v0/persons/%s/subjects",
}
_base_url = "https://api.bgm.tv/"
_req = RequestUtils(session=requests.Session())
@ -142,7 +144,7 @@ class BangumiApi(object):
"""
return self.__invoke(self._urls["detail"] % bid, _ts=datetime.strftime(datetime.now(), '%Y%m%d'))
def persons(self, bid: int):
def credits(self, bid: int):
"""
获取番剧人物
"""
@ -163,3 +165,20 @@ class BangumiApi(object):
获取关联条目信息
"""
return self.__invoke(self._urls["subjects"] % bid, _ts=datetime.strftime(datetime.now(), '%Y%m%d'))
def person_detail(self, person_id: int):
"""
获取人物详细信息
"""
return self.__invoke(self._urls["person_detail"] % person_id, _ts=datetime.strftime(datetime.now(), '%Y%m%d'))
def person_credits(self, person_id: int, page: int = 1):
"""
获取人物参演作品
"""
ret_list = []
result = self.__invoke(self._urls["person_credits"] % person_id, _ts=datetime.strftime(datetime.now(), '%Y%m%d'))
if result:
for item in result:
ret_list.append(item)
return ret_list[(page - 1) * 20: page * 20]

View File

@ -13,6 +13,7 @@ from app.modules import _ModuleBase
from app.modules.douban.apiv2 import DoubanApi
from app.modules.douban.douban_cache import DoubanCache
from app.modules.douban.scraper import DoubanScraper
from app.schemas import MediaPerson
from app.schemas.types import MediaType
from app.utils.common import retry
from app.utils.http import RequestUtils
@ -536,10 +537,6 @@ class DoubanModule(_ModuleBase):
:param meta: 识别的元数据
:reutrn: 媒体信息
"""
# 未启用豆瓣搜索时返回None
if settings.RECOGNIZE_SOURCE != "douban":
return None
if not meta.name:
return []
result = self.doubanapi.search(meta.name)
@ -563,6 +560,23 @@ class DoubanModule(_ModuleBase):
media.season = meta.begin_season
return ret_medias
def search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
搜索人物信息
"""
if not name:
return []
result = self.doubanapi.person_search(keyword=name)
if result and result.get('items'):
return [MediaPerson(source='douban', **{
'id': item.get('target_id'),
'name': item.get('target', {}).get('title'),
'url': item.get('target', {}).get('url'),
'images': item.get('target', {}).get('cover', {}),
'avatar': item.get('target', {}).get('cover_img', {}).get('url'),
}) for item in result.get('items')]
return []
@retry(Exception, 5, 3, 3, logger=logger)
def match_doubaninfo(self, name: str, imdbid: str = None,
mtype: MediaType = None, year: str = None, season: int = None) -> dict:
@ -804,11 +818,39 @@ class DoubanModule(_ModuleBase):
根据豆瓣ID查询推荐电影
:param doubanid: 豆瓣ID
"""
return self.doubanapi.movie_recommendations(subject_id=doubanid) or []
return self.doubanapi.movie_recommendations(subject_id=doubanid)
def douban_tv_recommend(self, doubanid: str) -> List[dict]:
"""
根据豆瓣ID查询推荐电视剧
:param doubanid: 豆瓣ID
"""
return self.doubanapi.tv_recommendations(subject_id=doubanid) or []
return self.doubanapi.tv_recommendations(subject_id=doubanid)
def douban_person_detail(self, person_id: int) -> dict:
"""
获取人物详细信息
:param person_id: 豆瓣人物ID
"""
return self.doubanapi.person_detail(person_id)
def douban_person_credits(self, person_id: int, page: int = 1) -> List[dict]:
"""
根据TMDBID查询人物参演作品
:param person_id: 人物ID
:param page: 页码
"""
# 获取人物参演作品集
personinfo = self.doubanapi.person_detail(person_id)
if not personinfo:
return []
collection_id = None
for module in personinfo.get("modules"):
if module.get("type") == "work_collections":
collection_id = module.get("payload", {}).get("id")
# 查询作品集内容
if collection_id:
collections = self.doubanapi.person_work(subject_id=collection_id, start=(page - 1) * 20, count=20)
if collections:
return collections.get("works")
return []

View File

@ -488,15 +488,16 @@ class DoubanApi(metaclass=Singleton):
return self.__invoke(self._urls["tv_photos"] % subject_id,
start=start, count=count, _ts=ts)
def person_detail(self, subject_id):
def person_detail(self, subject_id: int):
"""
用户详情
:param subject_id: 人物 id
:return:
"""
return self.__invoke(self._urls["person_detail"] + subject_id)
return self.__invoke(self._urls["person_detail"] + str(subject_id))
def person_work(self, subject_id, start=0, count=20, sort_by="time", collection_title="影视",
def person_work(self, subject_id: int, start: int = 0, count: int = 20, sort_by: str = "time",
collection_title: str = "影视",
ts=datetime.strftime(datetime.now(), '%Y%m%d')):
"""
用户作品集

View File

@ -13,7 +13,7 @@ from app.modules.themoviedb.category import CategoryHelper
from app.modules.themoviedb.scraper import TmdbScraper
from app.modules.themoviedb.tmdb_cache import TmdbCache
from app.modules.themoviedb.tmdbapi import TmdbApi
from app.schemas import TmdbPerson
from app.schemas import MediaPerson
from app.schemas.types import MediaType, MediaImageType
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
@ -227,10 +227,6 @@ class TheMovieDbModule(_ModuleBase):
:param meta: 识别的元数据
:reutrn: 媒体信息列表
"""
# 未启用时返回None
if settings.RECOGNIZE_SOURCE != "themoviedb":
return None
if not meta.name:
return []
if meta.type == MediaType.UNKNOWN and not meta.year:
@ -262,17 +258,15 @@ class TheMovieDbModule(_ModuleBase):
return medias
return []
def search_persons(self, name: str) -> Optional[List[TmdbPerson]]:
def search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
搜索人物信息
"""
if settings.RECOGNIZE_SOURCE != "themoviedb":
return None
if not name:
return []
results = self.tmdb.search_persons(name)
if results:
return [TmdbPerson(**person) for person in results]
return [MediaPerson(source='themoviedb', **person) for person in results]
return []
def scrape_metadata(self, path: Path, mediainfo: MediaInfo, transfer_type: str,

View File

@ -14,5 +14,3 @@ from .message import *
from .tmdb import *
from .transfer import *
from .file import *
from .bangumi import *
from .douban import *

View File

@ -1,12 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class BangumiPerson(BaseModel):
id: Optional[int] = None
name: Optional[str] = None
type: Optional[int] = 1
career: Optional[list] = []
images: Optional[dict] = {}
relation: Optional[str] = None

View File

@ -1,4 +1,4 @@
from typing import Optional, Dict, List
from typing import Optional, Dict, List, Union
from pydantic import BaseModel
@ -63,6 +63,8 @@ class MediaInfo(BaseModel):
"""
识别媒体信息
"""
# 来源themoviedb、douban、bangumi
source: Optional[str] = None
# 类型 电影、电视剧
type: Optional[str] = None
# 媒体标题
@ -228,3 +230,39 @@ class Context(BaseModel):
media_info: Optional[MediaInfo] = None
# 种子信息
torrent_info: Optional[TorrentInfo] = None
class MediaPerson(BaseModel):
"""
媒体人物信息
"""
# 来源themoviedb、douban、bangumi
source: Optional[str] = None
# 公共
id: Optional[int] = None
type: Optional[Union[str, int]] = 1
name: Optional[str] = None
character: Optional[str] = None
images: Optional[dict] = {}
# themoviedb
profile_path: Optional[str] = None
gender: Optional[Union[str, int]] = None
original_name: Optional[str] = None
credit_id: Optional[str] = None
also_known_as: Optional[list] = []
birthday: Optional[str] = None
deathday: Optional[str] = None
imdb_id: Optional[str] = None
known_for_department: Optional[str] = None
place_of_birth: Optional[str] = None
popularity: Optional[float] = None
biography: Optional[str] = None
# douban
roles: Optional[list] = []
title: Optional[str] = None
url: Optional[str] = None
avatar: Optional[Union[str, dict]] = None
latin_name: Optional[str] = None
# bangumi
career: Optional[list] = []
relation: Optional[str] = None

View File

@ -1,14 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class DoubanPerson(BaseModel):
id: Optional[str] = None
name: Optional[str] = None
roles: Optional[list] = []
title: Optional[str] = None
url: Optional[str] = None
character: Optional[str] = None
avatar: Optional[dict] = None
latin_name: Optional[str] = None

View File

@ -30,22 +30,3 @@ class TmdbEpisode(BaseModel):
vote_average: Optional[float] = None
crew: Optional[list] = []
guest_stars: Optional[list] = []
class TmdbPerson(BaseModel):
id: Optional[int] = None
name: Optional[str] = None
character: Optional[str] = None
profile_path: Optional[str] = None
gender: Optional[int] = None
original_name: Optional[str] = None
credit_id: Optional[str] = None
also_known_as: Optional[list] = []
birthday: Optional[str] = None
deathday: Optional[str] = None
imdb_id: Optional[str] = None
known_for_department: Optional[str] = None
place_of_birth: Optional[str] = None
popularity: Optional[float] = None
images: Optional[dict] = {}
biography: Optional[str] = None