use dataclass

This commit is contained in:
jxxghp 2023-07-07 14:57:26 +08:00
parent 00bef31e56
commit 1e1bba93de
3 changed files with 58 additions and 129 deletions

View File

@ -31,9 +31,9 @@ async def exists(media_in: schemas.MediaInfo,
"""
# 媒体信息
mediainfo = MediaInfo()
meta = MetaInfo(title=media_in.title)
if media_in.tmdb_id:
mediainfo.from_dict(media_in.dict())
meta = MetaInfo(title=mediainfo.title)
elif media_in.douban_id:
context = DoubanChain().recognize_by_doubanid(doubanid=media_in.douban_id)
if context:
@ -45,7 +45,7 @@ async def exists(media_in: schemas.MediaInfo,
mediainfo = context.media_info
meta = context.meta_info
# 查询缺失信息
if not mediainfo.tmdb_id:
if not mediainfo or not mediainfo.tmdb_id:
raise HTTPException(status_code=404, detail="媒体信息不存在")
exist_flag, no_exists = DownloadChain().get_no_exists_info(meta=meta, mediainfo=mediainfo)
if mediainfo.type == MediaType.MOVIE:

View File

@ -226,7 +226,7 @@ class SubscribeChain(ChainBase):
continue
# 如果是电视剧过滤掉已经下载的集数
if torrent_mediainfo.type == MediaType.TV:
if self.__check_subscribe_note(subscribe, torrent_meta.episodes):
if self.__check_subscribe_note(subscribe, torrent_meta.episode_list):
logger.info(f'{torrent_info.title} 对应剧集 {torrent_meta.episodes} 已下载过')
continue
matched_contexts.append(context)
@ -356,7 +356,7 @@ class SubscribeChain(ChainBase):
torrent_info = context.torrent_info
# 如果是电视剧过滤掉已经下载的集数
if torrent_mediainfo.type == MediaType.TV:
if self.__check_subscribe_note(subscribe, torrent_meta.episodes):
if self.__check_subscribe_note(subscribe, torrent_meta.episode_list):
logger.info(f'{torrent_info.title} 对应剧集 {torrent_meta.episodes} 已下载过')
continue
# 包含

View File

@ -1,5 +1,6 @@
import re
from typing import Optional, Any, List, Dict
from dataclasses import dataclass, field, asdict
from typing import List, Dict
from app.core.config import settings
from app.core.meta import MetaBase
@ -7,29 +8,30 @@ from app.core.metainfo import MetaInfo
from app.schemas.types import MediaType
@dataclass
class TorrentInfo:
# 站点ID
site: int = None
# 站点名称
site_name: Optional[str] = None
site_name: str = None
# 站点Cookie
site_cookie: Optional[str] = None
site_cookie: str = None
# 站点UA
site_ua: Optional[str] = None
site_ua: str = None
# 站点是否使用代理
site_proxy: bool = False
# 站点优先级
site_order: int = 0
# 种子名称
title: Optional[str] = None
title: str = None
# 种子副标题
description: Optional[str] = None
description: str = None
# IMDB ID
imdbid: str = None
# 种子链接
enclosure: Optional[str] = None
enclosure: str = None
# 详情页面
page_url: Optional[str] = None
page_url: str = None
# 种子大小
size: float = 0
# 做种者
@ -39,31 +41,26 @@ class TorrentInfo:
# 完成者
grabs: int = 0
# 发布时间
pubdate: Optional[str] = None
pubdate: str = None
# 已过时间
date_elapsed: Optional[str] = None
date_elapsed: str = None
# 上传因子
uploadvolumefactor: Optional[float] = None
uploadvolumefactor: float = None
# 下载因子
downloadvolumefactor: Optional[float] = None
downloadvolumefactor: float = None
# HR
hit_and_run: bool = False
# 种子标签
labels: Optional[list] = []
labels: list = field(default_factory=list)
# 种子优先级
pri_order: int = 0
def __init__(self, **kwargs):
self.labels = []
for key, value in kwargs.items():
if hasattr(self, key) and value is not None:
setattr(self, key, value)
def __getattr__(self, attribute):
return None
def __setattr__(self, name: str, value: Any):
self.__dict__[name] = value
@staticmethod
def get_free_string(upload_volume_factor, download_volume_factor):
"""
@ -93,70 +90,66 @@ class TorrentInfo:
"""
返回字典
"""
attributes = [
attr for attr in dir(self)
if not callable(getattr(self, attr)) and not attr.startswith("_")
]
return {
attr: getattr(self, attr) for attr in attributes
}
return asdict(self)
@dataclass
class MediaInfo:
# 类型 电影、电视剧
type: MediaType = None
# 媒体标题
title: Optional[str] = None
title: str = None
# 年份
year: Optional[str] = None
year: str = None
# 季
season: Optional[int] = None
season: int = None
# TMDB ID
tmdb_id: Optional[int] = None
tmdb_id: int = None
# IMDB ID
imdb_id: Optional[str] = None
imdb_id: str = None
# TVDB ID
tvdb_id: Optional[int] = None
tvdb_id: int = None
# 豆瓣ID
douban_id: Optional[str] = None
douban_id: str = None
# 媒体原语种
original_language: Optional[str] = None
original_language: str = None
# 媒体原发行标题
original_title: Optional[str] = None
original_title: str = None
# 媒体发行日期
release_date: Optional[str] = None
release_date: str = None
# 背景图片
backdrop_path: Optional[str] = None
backdrop_path: str = None
# 海报图片
poster_path: Optional[str] = None
poster_path: str = None
# 评分
vote_average: int = 0
# 描述
overview: Optional[str] = None
overview: str = None
# 所有别名和译名
names: Optional[list] = []
names: list = field(default_factory=list)
# 各季的剧集清单信息
seasons: Optional[Dict[int, list]] = {}
seasons: Dict[int, list] = field(default_factory=dict)
# 各季详情
season_info: List[dict] = []
season_info: List[dict] = field(default_factory=list)
# 各季的年份
season_years: Optional[dict] = {}
season_years: dict = field(default_factory=dict)
# 二级分类
category: str = ""
# TMDB INFO
tmdb_info: Optional[dict] = {}
tmdb_info: dict = field(default_factory=dict)
# 豆瓣 INFO
douban_info: Optional[dict] = {}
douban_info: dict = field(default_factory=dict)
# 导演
directors: List[dict] = []
directors: List[dict] = field(default_factory=dict)
# 演员
actors: List[dict] = []
actors: List[dict] = field(default_factory=dict)
def __init__(self, tmdb_info: dict = None, douban_info: dict = None):
# 初始化
self.seasons = {}
self.names = []
self.season_info = []
self.season_years = {}
self.names = []
self.directors = []
self.actors = []
self.tmdb_info = {}
@ -167,12 +160,6 @@ class MediaInfo:
if douban_info:
self.set_douban_info(douban_info)
def __getattr__(self, attribute):
return None
def __setattr__(self, name: str, value: Any):
self.__dict__[name] = value
def from_dict(self, data: dict):
"""
从字典中初始化
@ -327,7 +314,7 @@ class MediaInfo:
self.names = info.get('names') or []
# 剩余属性赋值
for key, value in info.items():
if not hasattr(self.__class__, key):
if not hasattr(self, key):
setattr(self, key, value)
def set_douban_info(self, info: dict):
@ -404,7 +391,7 @@ class MediaInfo:
self.seasons[meta.begin_season] = list(range(1, episodes_count + 1))
# 剩余属性赋值
for key, value in info.items():
if not hasattr(self.__class__, key):
if not hasattr(self, key):
setattr(self, key, value)
@property
@ -478,40 +465,27 @@ class MediaInfo:
overview = (overview[:max_len] + placeholder) if len(overview) > max_len else overview
return overview
def get_season_episodes(self, sea: int) -> list:
"""
返回指定季度的剧集信息
"""
if not self.seasons:
return []
return self.seasons.get(sea) or []
def to_dict(self):
"""
返回字典
"""
attributes = [
attr for attr in dir(self)
if not callable(getattr(self, attr)) and not attr.startswith("_")
]
return {
attr: getattr(self, attr).value
if isinstance(getattr(self, attr), MediaType)
else getattr(self, attr) for attr in attributes
}
dicts = asdict(self)
dicts["type"] = self.type.value if self.type else None
return dicts
@dataclass
class Context:
"""
上下文对象
"""
# 识别信息
_meta_info: Optional[MetaBase] = None
meta_info: MetaBase = None
# 种子信息
_torrent_info: Optional[TorrentInfo] = None
torrent_info: TorrentInfo = None
# 媒体信息
_media_info: Optional[MediaInfo] = None
media_info: MediaInfo = None
def __init__(self,
meta: MetaBase = None,
@ -519,62 +493,17 @@ class Context:
torrentinfo: TorrentInfo = None,
**kwargs):
if meta:
self._meta_info = meta
self.meta_info = meta
if mediainfo:
self._media_info = mediainfo
self.media_info = mediainfo
if torrentinfo:
self._torrent_info = torrentinfo
self.torrent_info = torrentinfo
if kwargs:
for k, v in kwargs.items():
setattr(self, k, v)
@property
def meta_info(self):
return self._meta_info
def set_meta_info(self, title: str, subtitle: str = None):
self._meta_info = MetaInfo(title, subtitle)
@property
def media_info(self):
return self._media_info
def set_media_info(self,
tmdb_info: dict = None,
douban_info: dict = None):
self._media_info = MediaInfo(tmdb_info, douban_info)
@property
def torrent_info(self):
return self._torrent_info
def set_torrent_info(self, info: dict):
self._torrent_info = TorrentInfo(**info)
def __getattr__(self, attribute):
return None
def __setattr__(self, name: str, value: Any):
self.__dict__[name] = value
def to_dict(self):
"""
转换为字典
"""
def object_to_dict(obj):
attributes = [
attr for attr in dir(obj)
if not callable(getattr(obj, attr)) and not attr.startswith("_")
]
return {
attr: getattr(obj, attr).value
if isinstance(getattr(obj, attr), MediaType)
else getattr(obj, attr) for attr in attributes
}
return {
"meta_info": object_to_dict(self.meta_info),
"media_info": object_to_dict(self.media_info),
"torrent_info": object_to_dict(self.torrent_info)
}
return asdict(self)