fix images

This commit is contained in:
jxxghp 2023-06-21 14:50:10 +08:00
parent d8378e50a5
commit 33d4a58bab
6 changed files with 101 additions and 19 deletions

View File

@ -12,6 +12,7 @@ from app.core.module import ModuleManager
from app.log import logger from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent
from app.schemas.types import TorrentStatus, MediaType, MediaImageType from app.schemas.types import TorrentStatus, MediaType, MediaImageType
from app.utils.object import ObjectUtils
from app.utils.singleton import AbstractSingleton, Singleton from app.utils.singleton import AbstractSingleton, Singleton
@ -46,18 +47,21 @@ class ChainBase(AbstractSingleton, metaclass=Singleton):
modules = self.modulemanager.get_modules(method) modules = self.modulemanager.get_modules(method)
for module in modules: for module in modules:
try: try:
func = getattr(module, method)
if is_result_empty(result): if is_result_empty(result):
# 返回None第一次执行或者需继续执行下一模块 # 返回None第一次执行或者需继续执行下一模块
result = getattr(module, method)(*args, **kwargs) result = func(*args, **kwargs)
elif ObjectUtils.check_signature(func, result):
# 返回结果与方法签名一致,将结果传入(不能多个模块同时运行的需要通过开关控制)
result = func(result)
elif isinstance(result, list):
# 返回为列表,有多个模块运行结果时进行合并(不能多个模块同时运行的需要通过开关控制)
temp = func(*args, **kwargs)
if isinstance(temp, list):
result.extend(temp)
else: else:
if isinstance(result, list): # 返回结果非列表也非空,则执行一次后跳出
# 返回为列表,有多个模块运行结果时进行合并(不能多个模块同时运行的需要通过开关控制) break
temp = getattr(module, method)(*args, **kwargs)
if isinstance(temp, list):
result.extend(temp)
else:
# 返回结果非列表也非空,则执行一次后跳出
break
except Exception as err: except Exception as err:
logger.error(f"运行模块 {method} 出错:{module.__class__.__name__} - {err}\n{traceback.print_exc()}") logger.error(f"运行模块 {method} 出错:{module.__class__.__name__} - {err}\n{traceback.print_exc()}")
return result return result
@ -105,7 +109,8 @@ class ChainBase(AbstractSingleton, metaclass=Singleton):
:param episode: :param episode:
""" """
return self.run_module("obtain_specific_image", mediaid=mediaid, mtype=mtype, return self.run_module("obtain_specific_image", mediaid=mediaid, mtype=mtype,
image_type=image_type, season=season, episode=episode) image_prefix=image_prefix, image_type=image_type,
season=season, episode=episode)
def douban_info(self, doubanid: str) -> Optional[dict]: def douban_info(self, doubanid: str) -> Optional[dict]:
""" """

View File

@ -182,7 +182,7 @@ class Command(metaclass=Singleton):
try: try:
logger.info(f"用户 {userid} 开始执行:{command.get('description')} ...") logger.info(f"用户 {userid} 开始执行:{command.get('description')} ...")
cmd_data = command['data'] if command.get('data') else {} cmd_data = command['data'] if command.get('data') else {}
args_num = ObjectUtils.has_arguments(command['func']) args_num = ObjectUtils.arguments(command['func'])
if args_num > 0: if args_num > 0:
if cmd_data: if cmd_data:
# 有内置参数直接使用内置参数 # 有内置参数直接使用内置参数

View File

@ -564,6 +564,45 @@ class TheMovieDbModule(_ModuleBase):
""" """
self.cache.save() self.cache.save()
def obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
if mediainfo.logo_path \
and mediainfo.poster_path \
and mediainfo.backdrop_path:
# 没有图片缺失
return mediainfo
# 调用TMDB图片接口
if mediainfo.type == MediaType.MOVIE:
images = self.tmdb.get_movie_images(mediainfo.tmdb_id)
else:
# FIXME tmdbv3api库没有tv.images接口只能取第1季的
images = self.tmdb.get_tv_images(mediainfo.tmdb_id, season=1)
if not images:
return mediainfo
# 背景图
if not mediainfo.backdrop_path:
backdrops = images.get("backdrops")
if backdrops:
backdrops = sorted(backdrops, key=lambda x: x.get("vote_average"), reverse=True)
mediainfo.backdrop_path = backdrops[0].get("file_path")
# 标志
if not mediainfo.logo_path:
logos = images.get("logos")
if logos:
logos = sorted(logos, key=lambda x: x.get("vote_average"), reverse=True)
mediainfo.logo_path = logos[0].get("file_path")
# 海报
if not mediainfo.poster_path:
posters = images.get("posters")
if posters:
posters = sorted(posters, key=lambda x: x.get("vote_average"), reverse=True)
mediainfo.poster_path = posters[0].get("file_path")
return mediainfo
def obtain_specific_image(self, mediaid: Union[str, int], mtype: MediaType, def obtain_specific_image(self, mediaid: Union[str, int], mtype: MediaType,
image_type: MediaImageType, image_prefix: str = "w500", image_type: MediaImageType, image_prefix: str = "w500",
season: int = None, episode: int = None) -> Optional[str]: season: int = None, episode: int = None) -> Optional[str]:

View File

@ -1019,3 +1019,29 @@ class TmdbHelper:
except Exception as e: except Exception as e:
print(str(e)) print(str(e))
return [] return []
def get_movie_images(self, tmdbid: int) -> dict:
"""
获取电影的图片
"""
if not self.movie:
return {}
try:
logger.info(f"正在获取电影图片:{tmdbid}...")
return self.movie.images(tmdbid) or {}
except Exception as e:
print(str(e))
return {}
def get_tv_images(self, tmdbid: int, season: int) -> dict:
"""
获取电视剧的图片
"""
if not self.tv:
return {}
try:
logger.info(f"正在获取电视剧图片:{tmdbid}...")
return self.season.images(tv_id=tmdbid, season_num=season) or {}
except Exception as e:
print(str(e))
return {}

View File

@ -77,5 +77,5 @@ class IpUtils:
try: try:
return ipaddress.ip_address(ip_str.strip()).is_private return ipaddress.ip_address(ip_str.strip()).is_private
except Exception as e: except Exception as e:
print(e) print(str(e))
return False return False

View File

@ -13,19 +13,14 @@ class ObjectUtils:
return str(obj).startswith("{") or str(obj).startswith("[") return str(obj).startswith("{") or str(obj).startswith("[")
@staticmethod @staticmethod
def has_arguments(func: Callable) -> int: def arguments(func: Callable) -> int:
""" """
返回函数的参数个数 返回函数的参数个数
""" """
signature = inspect.signature(func) signature = inspect.signature(func)
parameters = signature.parameters parameters = signature.parameters
parameter_names = list(parameters.keys())
# 排除 self 参数 return len(list(parameters.keys()))
if parameter_names and parameter_names[0] == 'self':
parameter_names = parameter_names[1:]
return len(parameter_names)
@staticmethod @staticmethod
def check_method(func: FunctionType) -> bool: def check_method(func: FunctionType) -> bool:
@ -33,3 +28,20 @@ class ObjectUtils:
检查函数是否已实现 检查函数是否已实现
""" """
return func.__code__.co_code != b'd\x01S\x00' return func.__code__.co_code != b'd\x01S\x00'
@staticmethod
def check_signature(func: FunctionType, *args) -> bool:
"""
检查输出与函数的参数类型是否一致
"""
# 获取函数的参数信息
signature = inspect.signature(func)
parameters = signature.parameters
# 检查输入参数个数和类型是否一致
if len(args) != len(parameters):
return False
for arg, param in zip(args, parameters.values()):
if not isinstance(arg, param.annotation):
return False
return True