split api

This commit is contained in:
jxxghp 2023-06-18 22:31:56 +08:00
parent 66727421da
commit 9b03e44097
6 changed files with 116 additions and 369 deletions

View File

@ -1,6 +1,6 @@
from fastapi import APIRouter
from app.api.endpoints import login, user, site, message, webhook, subscribe, media, douban, search, plugin
from app.api.endpoints import login, user, site, message, webhook, subscribe, media, douban, search, plugin, tmdb
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
@ -12,4 +12,5 @@ api_router.include_router(subscribe.router, prefix="/subscribe", tags=["subscrib
api_router.include_router(media.router, prefix="/media", tags=["media"])
api_router.include_router(search.router, prefix="/search", tags=["search"])
api_router.include_router(douban.router, prefix="/douban", tags=["douban"])
api_router.include_router(tmdb.router, prefix="/tmdb", tags=["tmdb"])
api_router.include_router(plugin.router, prefix="/plugin", tags=["plugin"])

View File

@ -1,11 +1,15 @@
from typing import Any
from typing import List, Any
from fastapi import APIRouter, Depends, BackgroundTasks
from fastapi import APIRouter, Depends
from fastapi import BackgroundTasks
from app import schemas
from app.chain.douban import DoubanChain
from app.chain.media import MediaChain
from app.core.context import MediaInfo
from app.db.models.user import User
from app.db.userauth import get_current_active_superuser
from app.db.userauth import get_current_active_user
router = APIRouter()
@ -26,3 +30,52 @@ async def sync_douban(
"""
background_tasks.add_task(start_douban_chain)
return schemas.Response(success=True, message="任务已启动")
@router.get("/doubanid", response_model=schemas.Context)
async def recognize_doubanid(doubanid: str,
_: User = Depends(get_current_active_user)) -> Any:
"""
根据豆瓣ID识别媒体信息
"""
# 识别媒体信息
context = MediaChain().recognize_by_doubanid(doubanid=doubanid)
return context.to_dict()
@router.get("/doubaninfo", response_model=schemas.MediaInfo)
async def douban_info(doubanid: str) -> Any:
"""
根据豆瓣ID查询豆瓣媒体信息
"""
doubaninfo = MediaChain().douban_info(doubanid=doubanid)
if doubaninfo:
return MediaInfo(douban_info=doubaninfo).to_dict()
else:
return schemas.MediaInfo()
@router.get("/doubanmovies", response_model=List[schemas.MediaInfo])
async def douban_movies(sort: str = "R",
tags: str = "",
start: int = 0,
count: int = 30,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览豆瓣电影信息
"""
movies = MediaChain().douban_movies(sort=sort, tags=tags, start=start, count=count)
return [movie.to_dict() for movie in movies]
@router.get("/doubantvs", response_model=List[schemas.MediaInfo])
async def douban_tvs(sort: str = "R",
tags: str = "",
start: int = 0,
count: int = 30,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览豆瓣剧集信息
"""
tvs = MediaChain().douban_tvs(sort=sort, tags=tags, start=start, count=count)
return [tv.to_dict() for tv in tvs]

View File

@ -4,10 +4,8 @@ from fastapi import APIRouter, Depends
from app import schemas
from app.chain.media import MediaChain
from app.core.context import MediaInfo
from app.db.models.user import User
from app.db.userauth import get_current_active_user
from app.schemas.types import MediaType
router = APIRouter()
@ -32,97 +30,3 @@ async def search_by_title(title: str,
"""
_, medias = MediaChain().search(title=title)
return [media.to_dict() for media in medias]
@router.get("/doubanid", response_model=schemas.Context)
async def recognize_doubanid(doubanid: str,
_: User = Depends(get_current_active_user)) -> Any:
"""
根据豆瓣ID识别媒体信息
"""
# 识别媒体信息
context = MediaChain().recognize_by_doubanid(doubanid=doubanid)
return context.to_dict()
@router.get("/tmdbinfo", response_model=schemas.MediaInfo)
async def tmdb_info(tmdbid: int, type_name: str) -> Any:
"""
根据TMDBID查询themoviedb媒体信息
"""
mtype = MediaType.MOVIE if type_name == MediaType.MOVIE.value else MediaType.TV
media = MediaChain().recognize_media(tmdbid=tmdbid, mtype=mtype)
if media:
return media.to_dict()
else:
return schemas.MediaInfo()
@router.get("/doubaninfo", response_model=schemas.MediaInfo)
async def douban_info(doubanid: str) -> Any:
"""
根据豆瓣ID查询豆瓣媒体信息
"""
doubaninfo = MediaChain().douban_info(doubanid=doubanid)
if doubaninfo:
return MediaInfo(douban_info=doubaninfo).to_dict()
else:
return schemas.MediaInfo()
@router.get("/tmdbmovies", response_model=List[schemas.MediaInfo])
async def tmdb_movies(sort_by: str = "popularity.desc",
with_genres: str = "",
with_original_language: str = "",
page: int = 1,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览TMDB电影信息
"""
movies = MediaChain().tmdb_movies(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [movie.to_dict() for movie in movies]
@router.get("/tmdbtvs", response_model=List[schemas.MediaInfo])
async def tmdb_tvs(sort_by: str = "popularity.desc",
with_genres: str = "",
with_original_language: str = "",
page: int = 1,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览TMDB剧集信息
"""
tvs = MediaChain().tmdb_tvs(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [tv.to_dict() for tv in tvs]
@router.get("/doubanmovies", response_model=List[schemas.MediaInfo])
async def douban_movies(sort: str = "R",
tags: str = "",
start: int = 0,
count: int = 30,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览豆瓣电影信息
"""
movies = MediaChain().douban_movies(sort=sort, tags=tags, start=start, count=count)
return [movie.to_dict() for movie in movies]
@router.get("/doubantvs", response_model=List[schemas.MediaInfo])
async def douban_tvs(sort: str = "R",
tags: str = "",
start: int = 0,
count: int = 30,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览豆瓣剧集信息
"""
tvs = MediaChain().douban_tvs(sort=sort, tags=tags, start=start, count=count)
return [tv.to_dict() for tv in tvs]

57
app/api/endpoints/tmdb.py Normal file
View File

@ -0,0 +1,57 @@
from typing import List, Any
from fastapi import APIRouter, Depends
from app import schemas
from app.chain.media import MediaChain
from app.core.context import MediaInfo
from app.db.models.user import User
from app.db.userauth import get_current_active_user
from app.schemas.types import MediaType
router = APIRouter()
@router.get("/tmdbinfo", response_model=schemas.MediaInfo)
async def tmdb_info(tmdbid: int, type_name: str) -> Any:
"""
根据TMDBID查询themoviedb媒体信息
"""
mtype = MediaType.MOVIE if type_name == MediaType.MOVIE.value else MediaType.TV
media = MediaChain().recognize_media(tmdbid=tmdbid, mtype=mtype)
if media:
return media.to_dict()
else:
return schemas.MediaInfo()
@router.get("/tmdbmovies", response_model=List[schemas.MediaInfo])
async def tmdb_movies(sort_by: str = "popularity.desc",
with_genres: str = "",
with_original_language: str = "",
page: int = 1,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览TMDB电影信息
"""
movies = MediaChain().tmdb_movies(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [movie.to_dict() for movie in movies]
@router.get("/tmdbtvs", response_model=List[schemas.MediaInfo])
async def tmdb_tvs(sort_by: str = "popularity.desc",
with_genres: str = "",
with_original_language: str = "",
page: int = 1,
_: User = Depends(get_current_active_user)) -> Any:
"""
浏览TMDB剧集信息
"""
tvs = MediaChain().tmdb_tvs(sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [tv.to_dict() for tv in tvs]

View File

@ -1,13 +1,5 @@
from abc import abstractmethod, ABCMeta
from pathlib import Path
from typing import Optional, List, Tuple, Union, Set, Any, Dict
from ruamel.yaml import CommentedMap
from app.core.context import MediaInfo, TorrentInfo, Context
from app.core.meta import MetaBase
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent
from app.schemas.types import TorrentStatus, MediaType
from typing import Tuple, Union
class _ModuleBase(metaclass=ABCMeta):
@ -30,263 +22,6 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def prepare_recognize(self, title: str,
subtitle: str = None) -> Tuple[str, str]:
"""
识别前的预处理
:param title: 标题
:param subtitle: 副标题
:return: 处理后的标题副标题该方法可被多个模块同时处理
"""
pass
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
tmdbid: int = None) -> Optional[MediaInfo]:
"""
识别媒体信息
:param meta: 识别的元数据
:param mtype: 媒体类型与tmdbid配套
:param tmdbid: tmdbid
:return: 识别的媒体信息包括剧集信息
"""
pass
def obtain_image(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
获取图片
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息该方法可被多个模块同时处理
"""
pass
def douban_info(self, doubanid: str) -> Optional[dict]:
"""
获取豆瓣信息
:param doubanid: 豆瓣ID
:return: 识别的媒体信息
"""
pass
def douban_discover(self, mtype: MediaType, sort: str, tags: str,
start: int = 0, count: int = 30) -> Optional[List[dict]]:
"""
发现豆瓣电影剧集
:param mtype: 媒体类型
:param sort: 排序方式
:param tags: 标签
:param start: 起始位置
:param count: 数量
:return: 媒体信息列表
"""
pass
def tmdb_discover(self, mtype: MediaType, sort_by: str, with_genres: str, with_original_language: str,
page: int = 1) -> Optional[List[dict]]:
"""
:param mtype: 媒体类型
:param sort_by: 排序方式
:param with_genres: 类型
:param with_original_language: 语言
:param page: 页码
:return: 媒体信息列表
"""
pass
def tvdb_info(self, tvdbid: int) -> Optional[dict]:
"""
获取TVDB信息
:param tvdbid: int
:return: 识别的媒体信息包括剧集信息
"""
pass
def message_parser(self, body: Any, form: Any, args: Any) -> Optional[dict]:
"""
解析消息内容返回字典注意以下约定值
userid: 用户ID
username: 用户名
text: 内容
:param body: 请求体
:param form: 表单
:param args: 参数
:return: 消息内容用户ID
"""
pass
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[dict]:
"""
解析Webhook报文体
:param body: 请求体
:param form: 表单
:param args: 参数
:return: 字典解析为消息时需要包含titletextimage
"""
pass
def search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]:
"""
搜索媒体信息
:param meta: 识别的元数据
:reutrn: 媒体信息
"""
pass
def media_exists(self, mediainfo: MediaInfo) -> Optional[ExistMediaInfo]:
"""
判断媒体文件是否存在
:param mediainfo: 识别的媒体信息
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
pass
def search_torrents(self, mediainfo: Optional[MediaInfo], sites: List[CommentedMap],
keyword: str = None) -> Optional[List[TorrentInfo]]:
"""
搜索站点多个站点需要多线程处理
:param mediainfo: 识别的媒体信息
:param sites: 站点列表
:param keyword: 搜索关键词如有按关键词搜索否则按媒体信息名称搜索
:reutrn: 资源列表
"""
pass
def refresh_torrents(self, sites: List[CommentedMap]) -> Optional[List[TorrentInfo]]:
"""
获取站点最新一页的种子多个站点需要多线程处理
:param sites: 站点列表
:reutrn: 种子资源列表
"""
pass
def filter_torrents(self, torrent_list: List[TorrentInfo],
season_episodes: Dict[int, dict] = None) -> List[TorrentInfo]:
"""
过滤资源
:param torrent_list: 资源列表
:param season_episodes: 过滤的剧集信息
:return: 过滤后的资源列表该方法可被多个模块同时处理
"""
pass
def download(self, torrent_path: Path, cookie: str,
episodes: Set[int] = None) -> Optional[Tuple[Optional[str], str]]:
"""
根据种子文件选择并添加下载任务
:param torrent_path: 种子文件地址
:param cookie: 站点Cookie
:param episodes: 需要下载的集数
:return: 种子Hash
"""
pass
def download_added(self, context: Context, torrent_path: Path) -> None:
"""
添加下载任务后的处理
:param context: 上下文包括识别信息媒体信息种子信息
:param torrent_path: 种子文件地址
:return: None该方法可被多个模块同时处理
"""
pass
def list_torrents(self, status: TorrentStatus = None,
hashs: Union[list, str] = None) -> Optional[List[Union[TransferTorrent, DownloadingTorrent]]]:
"""
获取下载器种子列表
:param status: 种子状态
:param hashs: 种子Hash
:return: 下载器中符合状态的种子列表
"""
pass
def transfer(self, path: Path, mediainfo: MediaInfo) -> Optional[TransferInfo]:
"""
转移一个路径下的文件
:param path: 文件路径
:param mediainfo: 识别的媒体信息
:return: {path, target_path, message}
"""
pass
def transfer_completed(self, hashs: Union[str, list], transinfo: TransferInfo) -> None:
"""
转移完成后的处理
:param hashs: 种子Hash
:param transinfo: 转移信息
:return: None该方法可被多个模块同时处理
"""
pass
def remove_torrents(self, hashs: Union[str, list]) -> bool:
"""
删除下载器种子
:param hashs: 种子Hash
:return: bool
"""
pass
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: Path) -> Optional[bool]:
"""
刷新媒体库
:param mediainfo: 识别的媒体信息
:param file_path: 文件路径
:return: 成功或失败
"""
pass
def post_message(self, title: str, text: str = None,
image: str = None, userid: Union[str, int] = None) -> Optional[bool]:
"""
发送消息
:param title: 标题
:param text: 内容
:param image: 图片
:param userid: 用户ID
:return: 成功或失败
"""
pass
def post_medias_message(self, title: str, items: List[MediaInfo],
userid: Union[str, int] = None) -> Optional[bool]:
"""
发送媒体信息选择列表
:param title: 标题
:param items: 消息列表
:param userid: 用户ID
:return: 成功或失败
"""
pass
def post_torrents_message(self, title: str, items: List[Context],
mediainfo: MediaInfo,
userid: Union[str, int] = None) -> Optional[bool]:
"""
发送种子信息选择列表
:param title: 标题
:param items: 消息列表
:param mediainfo: 识别的媒体信息
:param userid: 用户ID
:return: 成功或失败
"""
pass
def scrape_metadata(self, path: str, mediainfo: MediaInfo) -> None:
"""
刮削元数据
:param path: 媒体文件路径
:param mediainfo: 识别的媒体信息
:return: None该方法可被多个模块同时处理
"""
pass
def register_commands(self, commands: dict) -> None:
"""
注册命令实现这个函数接收系统可用的命令菜单
:param commands: 命令字典
:return: None该方法可被多个模块同时处理
"""
pass
@abstractmethod
def stop(self) -> None:
"""

View File

@ -171,14 +171,11 @@ class TheMovieDbModule(_ModuleBase):
return [MediaInfo(tmdb_info=info) for info in results]
def scrape_metadata(self, path: Path, mediainfo: MediaInfo,
force_nfo: bool = False, force_pic: bool = False) -> None:
def scrape_metadata(self, path: Path, mediainfo: MediaInfo) -> None:
"""
刮削元数据
:param path: 媒体文件路径
:param mediainfo: 识别的媒体信息
:param force_nfo: 强制刮削nfo
:param force_pic: 强制刮削图片
:return: 成功或失败
"""
if settings.SCRAP_SOURCE != "themoviedb":