fix bangumi apis

This commit is contained in:
jxxghp 2024-04-27 21:36:42 +08:00
parent 6a71bed821
commit 6d69ac42e5
4 changed files with 64 additions and 65 deletions

View File

@ -17,11 +17,10 @@ def calendar(page: int = 1,
"""
浏览Bangumi每日放送
"""
infos = BangumiChain().calendar(page=page, count=count)
if not infos:
medias = BangumiChain().calendar()
if medias:
return [media.to_dict() for media in medias[(page - 1) * count: page * count]]
return []
medias = [MediaInfo(bangumi_info=info) for info in infos]
return [media.to_dict() for media in medias]
@router.get("/credits/{bangumiid}", summary="查询Bangumi演职员表", response_model=List[schemas.MediaPerson])
@ -32,23 +31,24 @@ def bangumi_credits(bangumiid: int,
"""
查询Bangumi演职员表
"""
persons = BangumiChain().bangumi_credits(bangumiid, page=page, count=count)
if not persons:
persons = BangumiChain().bangumi_credits(bangumiid)
if persons:
return persons[(page - 1) * count: page * count]
return []
return [schemas.MediaPerson(source='bangumi', **person) for person in persons]
@router.get("/recommend/{bangumiid}", summary="查询Bangumi推荐", response_model=List[schemas.MediaInfo])
def bangumi_recommend(bangumiid: int,
page: int = 1,
count: int = 20,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询Bangumi推荐
"""
infos = BangumiChain().bangumi_recommend(bangumiid)
if not infos:
medias = BangumiChain().bangumi_recommend(bangumiid)
if medias:
return [media.to_dict() for media in medias[(page - 1) * count: page * count]]
return []
medias = [MediaInfo(bangumi_info=info) for info in infos]
return [media.to_dict() for media in medias]
@router.get("/person/{person_id}", summary="人物详情", response_model=schemas.MediaPerson)
@ -57,18 +57,7 @@ def bangumi_person(person_id: int,
"""
根据人物ID查询人物详情
"""
personinfo = BangumiChain().person_detail(person_id=person_id)
if not personinfo:
return schemas.MediaPerson(source='bangumi')
else:
return schemas.MediaPerson(source='bangumi', **{
"id": personinfo.get("id"),
"name": personinfo.get("name"),
"images": personinfo.get("images"),
"biography": personinfo.get("summary"),
"birthday": personinfo.get("birth_day"),
"gender": personinfo.get("gender")
})
return BangumiChain().person_detail(person_id=person_id)
@router.get("/person/credits/{person_id}", summary="人物参演作品", response_model=List[schemas.MediaInfo])
@ -78,11 +67,10 @@ def bangumi_person_credits(person_id: int,
"""
根据人物ID查询人物参演作品
"""
infos = BangumiChain().person_credits(person_id=person_id, page=page)
if not infos:
medias = BangumiChain().person_credits(person_id=person_id)
if medias:
return [media.to_dict() for media in medias[(page - 1) * 20: page * 20]]
return []
else:
return [MediaInfo(bangumi_info=info).to_dict() for info in infos]
@router.get("/{bangumiid}", summary="查询Bangumi详情", response_model=schemas.MediaInfo)

View File

@ -1,6 +1,8 @@
from typing import Optional, List
from app import schemas
from app.chain import ChainBase
from app.core.context import MediaInfo
from app.utils.singleton import Singleton
@ -9,13 +11,11 @@ class BangumiChain(ChainBase, metaclass=Singleton):
Bangumi处理链单例运行
"""
def calendar(self, page: int = 1, count: int = 30) -> Optional[List[dict]]:
def calendar(self) -> Optional[List[MediaInfo]]:
"""
获取Bangumi每日放送
:param page: 页码
:param count: 每页数量
"""
return self.run_module("bangumi_calendar", page=page, count=count)
return self.run_module("bangumi_calendar")
def bangumi_info(self, bangumiid: int) -> Optional[dict]:
"""
@ -25,33 +25,30 @@ class BangumiChain(ChainBase, metaclass=Singleton):
"""
return self.run_module("bangumi_info", bangumiid=bangumiid)
def bangumi_credits(self, bangumiid: int, page: int = 1, count: int = 20) -> List[dict]:
def bangumi_credits(self, bangumiid: int) -> List[schemas.MediaPerson]:
"""
根据BangumiID查询电影演职员表
:param bangumiid: BangumiID
:param page: 页码
:param count: 数量
"""
return self.run_module("bangumi_credits", bangumiid=bangumiid, page=page, count=count)
return self.run_module("bangumi_credits", bangumiid=bangumiid)
def bangumi_recommend(self, bangumiid: int) -> List[dict]:
def bangumi_recommend(self, bangumiid: int) -> Optional[List[MediaInfo]]:
"""
根据BangumiID查询推荐电影
:param bangumiid: BangumiID
"""
return self.run_module("bangumi_recommend", bangumiid=bangumiid)
def person_detail(self, person_id: int) -> dict:
def person_detail(self, person_id: int) -> Optional[schemas.MediaPerson]:
"""
根据人物ID查询Bangumi人物详情
:param person_id: 人物ID
"""
return self.run_module("bangumi_person_detail", person_id=person_id)
def person_credits(self, person_id: int, page: int = 1) -> List[dict]:
def person_credits(self, person_id: int) -> Optional[List[MediaInfo]]:
"""
根据人物ID查询人物参演作品
:param person_id: 人物ID
:param page: 页码
"""
return self.run_module("bangumi_person_credits", person_id=person_id, page=page)
return self.run_module("bangumi_person_credits", person_id=person_id)

View File

@ -1,5 +1,6 @@
from typing import List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.log import logger
from app.modules import _ModuleBase
@ -64,45 +65,58 @@ class BangumiModule(_ModuleBase):
logger.info(f"开始获取Bangumi信息{bangumiid} ...")
return self.bangumiapi.detail(bangumiid)
def bangumi_calendar(self, page: int = 1, count: int = 30) -> Optional[List[dict]]:
def bangumi_calendar(self) -> Optional[List[MediaInfo]]:
"""
获取Bangumi每日放送
:param page: 页码
:param count: 每页数量
"""
return self.bangumiapi.calendar(page, count)
infos = self.bangumiapi.calendar()
if infos:
return [MediaInfo(bangumi_info=info) for info in infos]
return []
def bangumi_credits(self, bangumiid: int, page: int = 1, count: int = 20) -> List[dict]:
def bangumi_credits(self, bangumiid: int) -> List[schemas.MediaPerson]:
"""
根据TMDBID查询电影演职员表
:param bangumiid: BangumiID
:param page: 页码
:param count: 数量
"""
persons = self.bangumiapi.credits(bangumiid)
if persons:
return persons[(page - 1) * count: page * count]
else:
return [schemas.MediaPerson(source='bangumi', **person) for person in persons]
return []
def bangumi_recommend(self, bangumiid: int) -> List[dict]:
def bangumi_recommend(self, bangumiid: int) -> List[MediaInfo]:
"""
根据BangumiID查询推荐电影
:param bangumiid: BangumiID
"""
return self.bangumiapi.subjects(bangumiid)
subjects = self.bangumiapi.subjects(bangumiid)
if subjects:
return [MediaInfo(bangumi_info=subject) for subject in subjects]
return []
def bangumi_person_detail(self, person_id: int) -> dict:
def bangumi_person_detail(self, person_id: int) -> Optional[schemas.MediaPerson]:
"""
获取人物详细信息
:param person_id: 豆瓣人物ID
"""
return self.bangumiapi.person_detail(person_id)
personinfo = self.bangumiapi.person_detail(person_id)
if personinfo:
return schemas.MediaPerson(source='bangumi', **{
"id": personinfo.get("id"),
"name": personinfo.get("name"),
"images": personinfo.get("images"),
"biography": personinfo.get("summary"),
"birthday": personinfo.get("birth_day"),
"gender": personinfo.get("gender")
})
return None
def bangumi_person_credits(self, person_id: int, page: int = 1) -> List[dict]:
def bangumi_person_credits(self, person_id: int) -> List[MediaInfo]:
"""
根据TMDBID查询人物参演作品
:param person_id: 人物ID
:param page: 页码
"""
return self.bangumiapi.person_credits(person_id=person_id, page=page)
credits_info = self.bangumiapi.person_credits(person_id=person_id)
if credits_info:
return [MediaInfo(bangumi_info=credit) for credit in credits_info]
return []

View File

@ -40,7 +40,7 @@ class BangumiApi(object):
print(e)
return None
def calendar(self, page: int = 1, count: int = 30):
def calendar(self):
"""
获取每日放送返回items
"""
@ -136,7 +136,7 @@ class BangumiApi(object):
if result:
for item in result:
ret_list.extend(item.get("items") or [])
return ret_list[(page - 1) * count: page * count]
return ret_list
def detail(self, bid: int):
"""
@ -172,7 +172,7 @@ class BangumiApi(object):
"""
return self.__invoke(self._urls["person_detail"] % person_id, _ts=datetime.strftime(datetime.now(), '%Y%m%d'))
def person_credits(self, person_id: int, page: int = 1):
def person_credits(self, person_id: int):
"""
获取人物参演作品
"""
@ -181,4 +181,4 @@ class BangumiApi(object):
if result:
for item in result:
ret_list.append(item)
return ret_list[(page - 1) * 20: page * 20]
return ret_list