988 lines
40 KiB
Python
988 lines
40 KiB
Python
import base64
|
||
import copy
|
||
import datetime
|
||
import json
|
||
import re
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any, List, Dict, Tuple, Optional
|
||
|
||
import pytz
|
||
import zhconv
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
from requests import RequestException
|
||
|
||
from app.chain.mediaserver import MediaServerChain
|
||
from app.chain.tmdb import TmdbChain
|
||
from app.core.config import settings
|
||
from app.core.event import eventmanager, Event
|
||
from app.core.meta import MetaBase
|
||
from app.log import logger
|
||
from app.modules.emby import Emby
|
||
from app.modules.jellyfin import Jellyfin
|
||
from app.modules.plex import Plex
|
||
from app.plugins import _PluginBase
|
||
from app.schemas import MediaInfo, MediaServerItem
|
||
from app.schemas.types import EventType, MediaType
|
||
from app.utils.common import retry
|
||
from app.utils.http import RequestUtils
|
||
from app.utils.string import StringUtils
|
||
|
||
|
||
class PersonMeta(_PluginBase):
|
||
# 插件名称
|
||
plugin_name = "演职人员刮削"
|
||
# 插件描述
|
||
plugin_desc = "刮削演职人员图片以及中文名称。"
|
||
# 插件图标
|
||
plugin_icon = "actor.png"
|
||
# 主题色
|
||
plugin_color = "#E66E72"
|
||
# 插件版本
|
||
plugin_version = "1.0"
|
||
# 插件作者
|
||
plugin_author = "jxxghp"
|
||
# 作者主页
|
||
author_url = "https://github.com/jxxghp"
|
||
# 插件配置项ID前缀
|
||
plugin_config_prefix = "personmeta_"
|
||
# 加载顺序
|
||
plugin_order = 24
|
||
# 可使用的用户级别
|
||
auth_level = 1
|
||
|
||
# 退出事件
|
||
_event = threading.Event()
|
||
|
||
# 私有属性
|
||
_scheduler = None
|
||
tmdbchain = None
|
||
mschain = None
|
||
_enabled = False
|
||
_onlyonce = False
|
||
_cron = None
|
||
_delay = 0
|
||
_remove_nozh = False
|
||
|
||
def init_plugin(self, config: dict = None):
|
||
self.tmdbchain = TmdbChain()
|
||
self.mschain = MediaServerChain(self.db)
|
||
if config:
|
||
self._enabled = config.get("enabled")
|
||
self._onlyonce = config.get("onlyonce")
|
||
self._cron = config.get("cron")
|
||
self._delay = config.get("delay") or 0
|
||
self._remove_nozh = config.get("remove_nozh") or False
|
||
|
||
# 停止现有任务
|
||
self.stop_service()
|
||
|
||
# 启动服务
|
||
if self._enabled or self._onlyonce:
|
||
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
|
||
if self._cron or self._onlyonce:
|
||
if self._cron:
|
||
try:
|
||
self._scheduler.add_job(func=self.scrap_library,
|
||
trigger=CronTrigger.from_crontab(self._cron),
|
||
name="演职人员刮削")
|
||
logger.info(f"演职人员刮削服务启动,周期:{self._cron}")
|
||
except Exception as e:
|
||
logger.error(f"演职人员刮削服务启动失败,错误信息:{str(e)}")
|
||
self.systemmessage.put(f"演职人员刮削服务启动失败,错误信息:{str(e)}")
|
||
if self._onlyonce:
|
||
self._scheduler.add_job(func=self.scrap_library, trigger='date',
|
||
run_date=datetime.datetime.now(
|
||
tz=pytz.timezone(settings.TZ)) + datetime.timedelta(seconds=3)
|
||
)
|
||
logger.info(f"演职人员刮削服务启动,立即运行一次")
|
||
# 关闭一次性开关
|
||
self._onlyonce = False
|
||
# 保存配置
|
||
self.__update_config()
|
||
|
||
if self._scheduler.get_jobs():
|
||
# 启动服务
|
||
self._scheduler.print_jobs()
|
||
self._scheduler.start()
|
||
|
||
def __update_config(self):
|
||
"""
|
||
更新配置
|
||
"""
|
||
self.update_config({
|
||
"enabled": self._enabled,
|
||
"onlyonce": self._onlyonce,
|
||
"cron": self._cron,
|
||
"delay": self._delay,
|
||
"remove_nozh": self._remove_nozh
|
||
})
|
||
|
||
def get_state(self) -> bool:
|
||
return self._enabled
|
||
|
||
@staticmethod
|
||
def get_command() -> List[Dict[str, Any]]:
|
||
pass
|
||
|
||
def get_api(self) -> List[Dict[str, Any]]:
|
||
pass
|
||
|
||
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
|
||
"""
|
||
拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构
|
||
"""
|
||
return [
|
||
{
|
||
'component': 'VForm',
|
||
'content': [
|
||
{
|
||
'component': 'VRow',
|
||
'content': [
|
||
{
|
||
'component': 'VCol',
|
||
'props': {
|
||
'cols': 12,
|
||
'md': 6
|
||
},
|
||
'content': [
|
||
{
|
||
'component': 'VSwitch',
|
||
'props': {
|
||
'model': 'enabled',
|
||
'label': '启用插件',
|
||
}
|
||
}
|
||
]
|
||
},
|
||
{
|
||
'component': 'VCol',
|
||
'props': {
|
||
'cols': 12,
|
||
'md': 6
|
||
},
|
||
'content': [
|
||
{
|
||
'component': 'VSwitch',
|
||
'props': {
|
||
'model': 'onlyonce',
|
||
'label': '立即运行一次',
|
||
}
|
||
}
|
||
]
|
||
}
|
||
]
|
||
},
|
||
{
|
||
'component': 'VRow',
|
||
'content': [
|
||
{
|
||
'component': 'VCol',
|
||
'props': {
|
||
'cols': 12,
|
||
'md': 6
|
||
},
|
||
'content': [
|
||
{
|
||
'component': 'VTextField',
|
||
'props': {
|
||
'model': 'cron',
|
||
'label': '媒体库扫描周期',
|
||
'placeholder': '5位cron表达式'
|
||
}
|
||
}
|
||
]
|
||
},
|
||
{
|
||
'component': 'VCol',
|
||
'props': {
|
||
'cols': 12,
|
||
'md': 6
|
||
},
|
||
'content': [
|
||
{
|
||
'component': 'VTextField',
|
||
'props': {
|
||
'model': 'delay',
|
||
'label': '入库延迟时间(秒)',
|
||
'placeholder': '30'
|
||
}
|
||
}
|
||
]
|
||
}
|
||
]
|
||
},
|
||
{
|
||
'component': 'VRow',
|
||
'content': [
|
||
{
|
||
'component': 'VCol',
|
||
'props': {
|
||
'cols': 12,
|
||
'md': 6
|
||
},
|
||
'content': [
|
||
{
|
||
'component': 'VSwitch',
|
||
'props': {
|
||
'model': 'remove_nozh',
|
||
'label': '删除非中文演员',
|
||
}
|
||
}
|
||
]
|
||
}
|
||
]
|
||
}
|
||
]
|
||
}
|
||
], {
|
||
"enabled": False,
|
||
"onlyonce": False,
|
||
"cron": "",
|
||
"delay": 30,
|
||
"remove_nozh": False
|
||
}
|
||
|
||
def get_page(self) -> List[dict]:
|
||
pass
|
||
|
||
@eventmanager.register(EventType.TransferComplete)
|
||
def scrap_rt(self, event: Event):
|
||
"""
|
||
根据事件实时刮削演员信息
|
||
"""
|
||
if not self._enabled:
|
||
return
|
||
# 事件数据
|
||
mediainfo: MediaInfo = event.event_data.get("mediainfo")
|
||
meta: MetaBase = event.event_data.get("meta")
|
||
if not mediainfo or not meta:
|
||
return
|
||
# 延迟
|
||
if self._delay:
|
||
time.sleep(int(self._delay))
|
||
# 查询媒体服务器中的条目
|
||
existsinfo = self.chain.media_exists(mediainfo=mediainfo)
|
||
if not existsinfo or not existsinfo.itemid:
|
||
logger.warn(f"演职人员刮削 {mediainfo.title_year} 在媒体库中不存在")
|
||
return
|
||
# 查询条目详情
|
||
iteminfo = self.mschain.iteminfo(server=existsinfo.server, item_id=existsinfo.itemid)
|
||
if not iteminfo:
|
||
logger.warn(f"演职人员刮削 {mediainfo.title_year} 条目详情获取失败")
|
||
return
|
||
# 刮削演职人员信息
|
||
self.__update_item(server=existsinfo.server, item=iteminfo,
|
||
mediainfo=mediainfo, season=meta.begin_season)
|
||
|
||
def scrap_library(self):
|
||
"""
|
||
扫描整个媒体库,刮削演员信息
|
||
"""
|
||
# 所有媒体服务器
|
||
if not settings.MEDIASERVER:
|
||
return
|
||
for server in settings.MEDIASERVER.split(","):
|
||
# 扫描所有媒体库
|
||
logger.info(f"开始刮削服务器 {server} 的演员信息 ...")
|
||
for library in self.mschain.librarys(server):
|
||
logger.info(f"开始刮削媒体库 {library.name} 的演员信息 ...")
|
||
for item in self.mschain.items(server, library.id):
|
||
if not item:
|
||
continue
|
||
if not item.item_id:
|
||
continue
|
||
if "Series" not in item.item_type \
|
||
and "Movie" not in item.item_type:
|
||
continue
|
||
if self._event.is_set():
|
||
logger.info(f"演职人员刮削服务停止")
|
||
return
|
||
# 处理条目
|
||
logger.info(f"开始刮削 {item.title} 的演员信息 ...")
|
||
self.__update_item(server=server, item=item)
|
||
logger.info(f"{item.title} 的演员信息刮削完成")
|
||
logger.info(f"媒体库 {library.name} 的演员信息刮削完成")
|
||
logger.info(f"服务器 {server} 的演员信息刮削完成")
|
||
|
||
def __update_peoples(self, server: str, itemid: str, iteminfo: dict, douban_actors):
|
||
# 处理媒体项中的人物信息
|
||
"""
|
||
"People": [
|
||
{
|
||
"Name": "丹尼尔·克雷格",
|
||
"Id": "33625",
|
||
"Role": "James Bond",
|
||
"Type": "Actor",
|
||
"PrimaryImageTag": "bef4f764540f10577f804201d8d27918"
|
||
}
|
||
]
|
||
"""
|
||
peoples = []
|
||
# 更新当前媒体项人物
|
||
for people in iteminfo["People"] or []:
|
||
if self._event.is_set():
|
||
logger.info(f"演职人员刮削服务停止")
|
||
return
|
||
if not people.get("Name"):
|
||
continue
|
||
if StringUtils.is_chinese(people.get("Name")) \
|
||
and StringUtils.is_chinese(people.get("Role")):
|
||
peoples.append(people)
|
||
continue
|
||
info = self.__update_people(server=server, people=people,
|
||
douban_actors=douban_actors)
|
||
if info:
|
||
peoples.append(info)
|
||
elif not self._remove_nozh:
|
||
peoples.append(people)
|
||
# 保存媒体项信息
|
||
if peoples:
|
||
iteminfo["People"] = peoples
|
||
self.set_iteminfo(server=server, itemid=itemid, iteminfo=iteminfo)
|
||
|
||
def __update_item(self, server: str, item: MediaServerItem,
|
||
mediainfo: MediaInfo = None, season: int = None):
|
||
"""
|
||
更新媒体服务器中的条目
|
||
"""
|
||
|
||
def __need_trans_actor(_item):
|
||
# 是否需要处理人物信息
|
||
_peoples = [x for x in _item.get("People", []) if
|
||
(x.get("Name") and not StringUtils.is_chinese(x.get("Name")))
|
||
or (x.get("Role") and not StringUtils.is_chinese(x.get("Role")))]
|
||
if _peoples:
|
||
return True
|
||
return False
|
||
|
||
# 识别媒体信息
|
||
if not mediainfo:
|
||
if not item.tmdbid:
|
||
logger.warn(f"{item.title} 未找到tmdbid,无法识别媒体信息")
|
||
return
|
||
mtype = MediaType.TV if item.item_type in ['Series', 'show'] else MediaType.MOVIE
|
||
mediainfo = self.chain.recognize_media(mtype=mtype, tmdbid=item.tmdbid)
|
||
if not mediainfo:
|
||
logger.warn(f"{item.title} 未识别到媒体信息")
|
||
return
|
||
|
||
# 获取媒体项
|
||
iteminfo = self.get_iteminfo(server=server, itemid=item.item_id)
|
||
if not iteminfo:
|
||
logger.warn(f"{item.title} 未找到媒体项")
|
||
return
|
||
|
||
if __need_trans_actor(iteminfo):
|
||
# 获取豆瓣演员信息
|
||
logger.info(f"开始获取 {item.title} 的豆瓣演员信息 ...")
|
||
douban_actors = self.__get_douban_actors(mediainfo=mediainfo, season=season)
|
||
self.__update_peoples(server=server, itemid=item.item_id, iteminfo=iteminfo, douban_actors=douban_actors)
|
||
else:
|
||
logger.info(f"{item.title} 的人物信息已是中文,无需更新")
|
||
|
||
# 处理季和集人物
|
||
if iteminfo.get("Type") and "Series" in iteminfo["Type"]:
|
||
# 获取季媒体项
|
||
seasons = self.get_items(server=server, parentid=item.item_id, mtype="Season")
|
||
if not seasons:
|
||
logger.warn(f"{item.title} 未找到季媒体项")
|
||
return
|
||
for season in seasons["Items"]:
|
||
# 获取豆瓣演员信息
|
||
season_actors = self.__get_douban_actors(mediainfo=mediainfo, season=season.get("IndexNumber"))
|
||
# 如果是Jellyfin,更新季的人物,Emby/Plex季没有人物
|
||
if server == "jellyfin":
|
||
seasoninfo = self.get_iteminfo(server=server, itemid=season.get("Id"))
|
||
if not seasoninfo:
|
||
logger.warn(f"{item.title} 未找到季媒体项:{season.get('Id')}")
|
||
continue
|
||
|
||
if __need_trans_actor(seasoninfo):
|
||
# 更新季媒体项人物
|
||
self.__update_peoples(server=server, itemid=season.get("Id"), iteminfo=seasoninfo,
|
||
douban_actors=season_actors)
|
||
logger.info(f"季 {seasoninfo.get('Id')} 的人物信息更新完成")
|
||
else:
|
||
logger.info(f"季 {seasoninfo.get('Id')} 的人物信息已是中文,无需更新")
|
||
# 获取集媒体项
|
||
episodes = self.get_items(server=server, parentid=season.get("Id"), mtype="Episode")
|
||
if not episodes:
|
||
logger.warn(f"{item.title} 未找到集媒体项")
|
||
continue
|
||
# 更新集媒体项人物
|
||
for episode in episodes["Items"]:
|
||
# 获取集媒体项详情
|
||
episodeinfo = self.get_iteminfo(server=server, itemid=episode.get("Id"))
|
||
if not episodeinfo:
|
||
logger.warn(f"{item.title} 未找到集媒体项:{episode.get('Id')}")
|
||
continue
|
||
if __need_trans_actor(episodeinfo):
|
||
# 更新集媒体项人物
|
||
self.__update_peoples(server=server, itemid=episode.get("Id"), iteminfo=episodeinfo,
|
||
douban_actors=season_actors)
|
||
logger.info(f"集 {episodeinfo.get('Id')} 的人物信息更新完成")
|
||
else:
|
||
logger.info(f"集 {episodeinfo.get('Id')} 的人物信息已是中文,无需更新")
|
||
|
||
def __update_people(self, server: str, people: dict, douban_actors: list = None) -> Optional[dict]:
|
||
"""
|
||
更新人物信息,返回替换后的人物信息
|
||
"""
|
||
|
||
def __get_peopleid(p: dict) -> Tuple[Optional[str], Optional[str]]:
|
||
"""
|
||
获取人物的TMDBID、IMDBID
|
||
"""
|
||
if not p.get("ProviderIds"):
|
||
return None, None
|
||
peopletmdbid, peopleimdbid = None, None
|
||
if "Tmdb" in p["ProviderIds"]:
|
||
peopletmdbid = p["ProviderIds"]["Tmdb"]
|
||
if "tmdb" in p["ProviderIds"]:
|
||
peopletmdbid = p["ProviderIds"]["tmdb"]
|
||
if "Imdb" in p["ProviderIds"]:
|
||
peopleimdbid = p["ProviderIds"]["Imdb"]
|
||
if "imdb" in p["ProviderIds"]:
|
||
peopleimdbid = p["ProviderIds"]["imdb"]
|
||
return peopletmdbid, peopleimdbid
|
||
|
||
# 返回的人物信息
|
||
ret_people = copy.deepcopy(people)
|
||
|
||
try:
|
||
# 查询媒体库人物详情
|
||
personinfo = self.get_iteminfo(server=server, itemid=people.get("Id"))
|
||
if not personinfo:
|
||
logger.warn(f"未找到人物 {people.get('Name')} 的信息")
|
||
return None
|
||
|
||
# 是否更新标志
|
||
updated_name = False
|
||
updated_overview = False
|
||
update_character = False
|
||
profile_path = None
|
||
|
||
# 从TMDB信息中更新人物信息
|
||
person_tmdbid, person_imdbid = __get_peopleid(personinfo)
|
||
if person_tmdbid:
|
||
person_tmdbinfo = self.tmdbchain.person_detail(int(person_tmdbid))
|
||
if person_tmdbinfo:
|
||
cn_name = self.__get_chinese_name(person_tmdbinfo)
|
||
if cn_name:
|
||
# 更新中文名
|
||
logger.info(f"{people.get('Name')} 从TMDB获取到中文名:{cn_name}")
|
||
personinfo["Name"] = cn_name
|
||
ret_people["Name"] = cn_name
|
||
updated_name = True
|
||
# 更新中文描述
|
||
biography = person_tmdbinfo.get("biography")
|
||
if biography and StringUtils.is_chinese(biography):
|
||
logger.info(f"{people.get('Name')} 从TMDB获取到中文描述")
|
||
personinfo["Overview"] = biography
|
||
updated_overview = True
|
||
# 图片
|
||
profile_path = person_tmdbinfo.get('profile_path')
|
||
if profile_path:
|
||
logger.info(f"{people.get('Name')} 从TMDB获取到图片:{profile_path}")
|
||
profile_path = f"https://{settings.TMDB_IMAGE_DOMAIN}/t/p/original{profile_path}"
|
||
|
||
# 从豆瓣信息中更新人物信息
|
||
"""
|
||
{
|
||
"name": "丹尼尔·克雷格",
|
||
"roles": [
|
||
"演员",
|
||
"制片人",
|
||
"配音"
|
||
],
|
||
"title": "丹尼尔·克雷格(同名)英国,英格兰,柴郡,切斯特影视演员",
|
||
"url": "https://movie.douban.com/celebrity/1025175/",
|
||
"user": null,
|
||
"character": "饰 詹姆斯·邦德 James Bond 007",
|
||
"uri": "douban://douban.com/celebrity/1025175?subject_id=27230907",
|
||
"avatar": {
|
||
"large": "https://qnmob3.doubanio.com/view/celebrity/raw/public/p42588.jpg?imageView2/2/q/80/w/600/h/3000/format/webp",
|
||
"normal": "https://qnmob3.doubanio.com/view/celebrity/raw/public/p42588.jpg?imageView2/2/q/80/w/200/h/300/format/webp"
|
||
},
|
||
"sharing_url": "https://www.douban.com/doubanapp/dispatch?uri=/celebrity/1025175/",
|
||
"type": "celebrity",
|
||
"id": "1025175",
|
||
"latin_name": "Daniel Craig"
|
||
}
|
||
"""
|
||
if douban_actors and (not updated_name
|
||
or not updated_overview
|
||
or not update_character):
|
||
# 从豆瓣演员中匹配中文名称、角色和简介
|
||
for douban_actor in douban_actors:
|
||
if douban_actor.get("latin_name") == people.get("Name") \
|
||
or douban_actor.get("name") == people.get("Name"):
|
||
# 名称
|
||
if not updated_name:
|
||
logger.info(f"{people.get('Name')} 从豆瓣中获取到中文名:{douban_actor.get('name')}")
|
||
personinfo["Name"] = douban_actor.get("name")
|
||
ret_people["Name"] = douban_actor.get("name")
|
||
updated_name = True
|
||
# 描述
|
||
if not updated_overview:
|
||
if douban_actor.get("title"):
|
||
logger.info(f"{people.get('Name')} 从豆瓣中获取到中文描述:{douban_actor.get('title')}")
|
||
personinfo["Overview"] = douban_actor.get("title")
|
||
updated_overview = True
|
||
# 饰演角色
|
||
if not update_character:
|
||
if douban_actor.get("character"):
|
||
# "饰 詹姆斯·邦德 James Bond 007"
|
||
character = re.sub(r"饰\s+", "",
|
||
douban_actor.get("character"))
|
||
character = re.sub("演员", "",
|
||
character)
|
||
if character:
|
||
logger.info(f"{people.get('Name')} 从豆瓣中获取到饰演角色:{character}")
|
||
ret_people["Role"] = character
|
||
update_character = True
|
||
# 图片
|
||
if not profile_path:
|
||
avatar = douban_actor.get("avatar") or {}
|
||
if avatar.get("large"):
|
||
logger.info(f"{people.get('Name')} 从豆瓣中获取到图片:{avatar.get('large')}")
|
||
profile_path = avatar.get("large")
|
||
break
|
||
|
||
# 更新人物图片
|
||
if profile_path:
|
||
logger.info(f"更新人物 {people.get('Name')} 的图片:{profile_path}")
|
||
self.set_item_image(server=server, itemid=people.get("Id"), imageurl=profile_path)
|
||
|
||
# 锁定人物信息
|
||
if updated_name:
|
||
if "Name" not in personinfo["LockedFields"]:
|
||
personinfo["LockedFields"].append("Name")
|
||
if updated_overview:
|
||
if "Overview" not in personinfo["LockedFields"]:
|
||
personinfo["LockedFields"].append("Overview")
|
||
|
||
# 更新人物信息
|
||
if updated_name or updated_overview or update_character:
|
||
logger.info(f"更新人物 {people.get('Name')} 的信息:{personinfo}")
|
||
ret = self.set_iteminfo(server=server, itemid=people.get("Id"), iteminfo=personinfo)
|
||
if ret:
|
||
return ret_people
|
||
else:
|
||
logger.info(f"人物 {people.get('Name')} 未找到中文数据")
|
||
except Exception as err:
|
||
logger.error(f"更新人物信息失败:{err}")
|
||
return None
|
||
|
||
def __get_douban_actors(self, mediainfo: MediaInfo, season: int = None) -> List[dict]:
|
||
"""
|
||
获取豆瓣演员信息
|
||
"""
|
||
# 随机休眠 3-10 秒
|
||
sleep_time = 3 + int(time.time()) % 7
|
||
logger.info(f"随机休眠 {sleep_time}秒 ...")
|
||
time.sleep(sleep_time)
|
||
# 匹配豆瓣信息
|
||
doubaninfo = self.chain.match_doubaninfo(name=mediainfo.title,
|
||
imdbid=mediainfo.imdb_id,
|
||
mtype=mediainfo.type.value,
|
||
year=mediainfo.year,
|
||
season=season)
|
||
# 豆瓣演员
|
||
if doubaninfo:
|
||
doubanid = doubaninfo.get("id")
|
||
if not str(doubanid).isdigit():
|
||
# 从https://api.douban.com/movie/35561680中提取出字
|
||
doubanid = re.search(r"\d+", doubanid).group(0)
|
||
doubanitem = self.chain.douban_info(doubanid) or {}
|
||
return (doubanitem.get("actors") or []) + (doubanitem.get("directors") or [])
|
||
else:
|
||
logger.warn(f"未找到豆瓣信息:{mediainfo.title_year}")
|
||
return []
|
||
|
||
@staticmethod
|
||
def get_iteminfo(server: str, itemid: str) -> dict:
|
||
"""
|
||
获得媒体项详情
|
||
"""
|
||
|
||
def __get_emby_iteminfo() -> dict:
|
||
"""
|
||
获得Emby媒体项详情
|
||
"""
|
||
try:
|
||
url = f'[HOST]emby/Users/[USER]/Items/{itemid}?' \
|
||
f'Fields=ChannelMappingInfo&api_key=[APIKEY]'
|
||
res = Emby().get_data(url=url)
|
||
if res:
|
||
return res.json()
|
||
except Exception as err:
|
||
logger.error(f"获取Emby媒体项详情失败:{err}")
|
||
return {}
|
||
|
||
def __get_jellyfin_iteminfo() -> dict:
|
||
"""
|
||
获得Jellyfin媒体项详情
|
||
"""
|
||
try:
|
||
url = f'[HOST]Users/[USER]/Items/{itemid}?Fields=ChannelMappingInfo&api_key=[APIKEY]'
|
||
res = Jellyfin().get_data(url=url)
|
||
if res:
|
||
result = res.json()
|
||
if result:
|
||
result['FileName'] = Path(result['Path']).name
|
||
return result
|
||
except Exception as err:
|
||
logger.error(f"获取Jellyfin媒体项详情失败:{err}")
|
||
return {}
|
||
|
||
def __get_plex_iteminfo() -> dict:
|
||
"""
|
||
获得Plex媒体项详情
|
||
"""
|
||
iteminfo = {}
|
||
try:
|
||
plexitem = Plex().get_plex().library.fetchItem(ekey=itemid)
|
||
if 'movie' in plexitem.METADATA_TYPE:
|
||
iteminfo['Type'] = 'Movie'
|
||
iteminfo['IsFolder'] = False
|
||
elif 'episode' in plexitem.METADATA_TYPE:
|
||
iteminfo['Type'] = 'Series'
|
||
iteminfo['IsFolder'] = False
|
||
if 'show' in plexitem.TYPE:
|
||
iteminfo['ChildCount'] = plexitem.childCount
|
||
iteminfo['Name'] = plexitem.title
|
||
iteminfo['Id'] = plexitem.key
|
||
iteminfo['ProductionYear'] = plexitem.year
|
||
iteminfo['ProviderIds'] = {}
|
||
for guid in plexitem.guids:
|
||
idlist = str(guid.id).split(sep='://')
|
||
if len(idlist) < 2:
|
||
continue
|
||
iteminfo['ProviderIds'][idlist[0]] = idlist[1]
|
||
for location in plexitem.locations:
|
||
iteminfo['Path'] = location
|
||
iteminfo['FileName'] = Path(location).name
|
||
iteminfo['Overview'] = plexitem.summary
|
||
iteminfo['CommunityRating'] = plexitem.audienceRating
|
||
return iteminfo
|
||
except Exception as err:
|
||
logger.error(f"获取Plex媒体项详情失败:{err}")
|
||
return {}
|
||
|
||
if server == "emby":
|
||
return __get_emby_iteminfo()
|
||
elif server == "jellyfin":
|
||
return __get_jellyfin_iteminfo()
|
||
else:
|
||
return __get_plex_iteminfo()
|
||
|
||
@staticmethod
|
||
def get_items(server: str, parentid: str, mtype: str = None) -> dict:
|
||
"""
|
||
获得媒体的所有子媒体项
|
||
"""
|
||
pass
|
||
|
||
def __get_emby_items() -> dict:
|
||
"""
|
||
获得Emby媒体的所有子媒体项
|
||
"""
|
||
try:
|
||
if parentid:
|
||
url = f'[HOST]emby/Users/[USER]/Items?ParentId={parentid}&api_key=[APIKEY]'
|
||
else:
|
||
url = '[HOST]emby/Users/[USER]/Items?api_key=[APIKEY]'
|
||
res = Emby().get_data(url=url)
|
||
if res:
|
||
return res.json()
|
||
except Exception as err:
|
||
logger.error(f"获取Emby媒体的所有子媒体项失败:{err}")
|
||
return {}
|
||
|
||
def __get_jellyfin_items() -> dict:
|
||
"""
|
||
获得Jellyfin媒体的所有子媒体项
|
||
"""
|
||
try:
|
||
if parentid:
|
||
url = f'[HOST]Users/[USER]/Items?ParentId={parentid}&api_key=[APIKEY]'
|
||
else:
|
||
url = '[HOST]Users/[USER]/Items?api_key=[APIKEY]'
|
||
res = Jellyfin().get_data(url=url)
|
||
if res:
|
||
return res.json()
|
||
except Exception as err:
|
||
logger.error(f"获取Jellyfin媒体的所有子媒体项失败:{err}")
|
||
return {}
|
||
|
||
def __get_plex_items() -> dict:
|
||
"""
|
||
获得Plex媒体的所有子媒体项
|
||
"""
|
||
items = {}
|
||
try:
|
||
plex = Plex().get_plex()
|
||
items['Items'] = []
|
||
if parentid:
|
||
if mtype and 'Season' in mtype:
|
||
plexitem = plex.library.fetchItem(ekey=parentid)
|
||
items['Items'] = []
|
||
for season in plexitem.seasons():
|
||
item = {
|
||
'Name': season.title,
|
||
'Id': season.key,
|
||
'IndexNumber': season.seasonNumber,
|
||
'Overview': season.summary
|
||
}
|
||
items['Items'].append(item)
|
||
elif mtype and 'Episode' in mtype:
|
||
plexitem = plex.library.fetchItem(ekey=parentid)
|
||
items['Items'] = []
|
||
for episode in plexitem.episodes():
|
||
item = {
|
||
'Name': episode.title,
|
||
'Id': episode.key,
|
||
'IndexNumber': episode.episodeNumber,
|
||
'Overview': episode.summary,
|
||
'CommunityRating': episode.audienceRating
|
||
}
|
||
items['Items'].append(item)
|
||
else:
|
||
plexitems = plex.library.sectionByID(sectionID=parentid)
|
||
for plexitem in plexitems.all():
|
||
item = {}
|
||
if 'movie' in plexitem.METADATA_TYPE:
|
||
item['Type'] = 'Movie'
|
||
item['IsFolder'] = False
|
||
elif 'episode' in plexitem.METADATA_TYPE:
|
||
item['Type'] = 'Series'
|
||
item['IsFolder'] = False
|
||
item['Name'] = plexitem.title
|
||
item['Id'] = plexitem.key
|
||
items['Items'].append(item)
|
||
else:
|
||
plexitems = plex.library.sections()
|
||
for plexitem in plexitems:
|
||
item = {}
|
||
if 'Directory' in plexitem.TAG:
|
||
item['Type'] = 'Folder'
|
||
item['IsFolder'] = True
|
||
elif 'movie' in plexitem.METADATA_TYPE:
|
||
item['Type'] = 'Movie'
|
||
item['IsFolder'] = False
|
||
elif 'episode' in plexitem.METADATA_TYPE:
|
||
item['Type'] = 'Series'
|
||
item['IsFolder'] = False
|
||
item['Name'] = plexitem.title
|
||
item['Id'] = plexitem.key
|
||
items['Items'].append(item)
|
||
return items
|
||
except Exception as err:
|
||
logger.error(f"获取Plex媒体的所有子媒体项失败:{err}")
|
||
return {}
|
||
|
||
if server == "emby":
|
||
return __get_emby_items()
|
||
elif server == "jellyfin":
|
||
return __get_jellyfin_items()
|
||
else:
|
||
return __get_plex_items()
|
||
|
||
@staticmethod
|
||
def set_iteminfo(server: str, itemid: str, iteminfo: dict):
|
||
"""
|
||
更新媒体项详情
|
||
"""
|
||
|
||
def __set_emby_iteminfo():
|
||
"""
|
||
更新Emby媒体项详情
|
||
"""
|
||
try:
|
||
res = Emby().post_data(
|
||
url=f'[HOST]emby/Items/{itemid}?api_key=[APIKEY]&reqformat=json',
|
||
data=json.dumps(iteminfo),
|
||
headers={
|
||
"Content-Type": "application/json"
|
||
}
|
||
)
|
||
if res and res.status_code in [200, 204]:
|
||
return True
|
||
else:
|
||
logger.error(f"更新Emby媒体项详情失败,错误码:{res.status_code}")
|
||
return False
|
||
except Exception as err:
|
||
logger.error(f"更新Emby媒体项详情失败:{err}")
|
||
return False
|
||
|
||
def __set_jellyfin_iteminfo():
|
||
"""
|
||
更新Jellyfin媒体项详情
|
||
"""
|
||
try:
|
||
res = Jellyfin().post_data(
|
||
url=f'[HOST]Items/{itemid}?api_key=[APIKEY]',
|
||
data=json.dumps(iteminfo),
|
||
headers={
|
||
"Content-Type": "application/json"
|
||
}
|
||
)
|
||
if res and res.status_code in [200, 204]:
|
||
return True
|
||
else:
|
||
logger.error(f"更新Jellyfin媒体项详情失败,错误码:{res.status_code}")
|
||
return False
|
||
except Exception as err:
|
||
logger.error(f"更新Jellyfin媒体项详情失败:{err}")
|
||
return False
|
||
|
||
def __set_plex_iteminfo():
|
||
"""
|
||
更新Plex媒体项详情
|
||
"""
|
||
try:
|
||
plexitem = Plex().get_plex().library.fetchItem(ekey=itemid)
|
||
if 'CommunityRating' in iteminfo:
|
||
edits = {
|
||
'audienceRating.value': iteminfo['CommunityRating'],
|
||
'audienceRating.locked': 1
|
||
}
|
||
plexitem.edit(**edits)
|
||
plexitem.editTitle(iteminfo['Name']).editSummary(iteminfo['Overview']).reload()
|
||
return True
|
||
except Exception as err:
|
||
logger.error(f"更新Plex媒体项详情失败:{err}")
|
||
return False
|
||
|
||
if server == "emby":
|
||
return __set_emby_iteminfo()
|
||
elif server == "jellyfin":
|
||
return __set_jellyfin_iteminfo()
|
||
else:
|
||
return __set_plex_iteminfo()
|
||
|
||
@staticmethod
|
||
@retry(RequestException, logger=logger)
|
||
def set_item_image(server: str, itemid: str, imageurl: str):
|
||
"""
|
||
更新媒体项图片
|
||
"""
|
||
|
||
def __download_image():
|
||
"""
|
||
下载图片
|
||
"""
|
||
try:
|
||
if "doubanio.com" in imageurl:
|
||
r = RequestUtils(headers={
|
||
'Referer': "https://movie.douban.com/"
|
||
}, ua=settings.USER_AGENT).get_res(url=imageurl, raise_exception=True)
|
||
else:
|
||
r = RequestUtils().get_res(url=imageurl, raise_exception=True)
|
||
if r:
|
||
return base64.b64encode(r.content).decode()
|
||
else:
|
||
logger.info(f"{imageurl} 图片下载失败,请检查网络连通性")
|
||
except Exception as err:
|
||
logger.error(f"下载图片失败:{err}")
|
||
return None
|
||
|
||
def __set_emby_item_image(_base64: str):
|
||
"""
|
||
更新Emby媒体项图片
|
||
"""
|
||
try:
|
||
url = f'[HOST]emby/Items/{itemid}/Images/Primary?api_key=[APIKEY]'
|
||
res = Emby().post_data(
|
||
url=url,
|
||
data=_base64,
|
||
headers={
|
||
"Content-Type": "image/png"
|
||
}
|
||
)
|
||
if res and res.status_code in [200, 204]:
|
||
return True
|
||
else:
|
||
logger.error(f"更新Emby媒体项图片失败,错误码:{res.status_code}")
|
||
return False
|
||
except Exception as result:
|
||
logger.error(f"更新Emby媒体项图片失败:{result}")
|
||
return False
|
||
|
||
def __set_jellyfin_item_image():
|
||
"""
|
||
更新Jellyfin媒体项图片
|
||
# FIXME 改为预下载图片
|
||
"""
|
||
try:
|
||
url = f'[HOST]Items/{itemid}/RemoteImages/Download?' \
|
||
f'Type=Primary&ImageUrl={imageurl}&ProviderName=TheMovieDb&api_key=[APIKEY]'
|
||
res = Jellyfin().post_data(url=url)
|
||
if res and res.status_code in [200, 204]:
|
||
return True
|
||
else:
|
||
logger.error(f"更新Jellyfin媒体项图片失败,错误码:{res.status_code}")
|
||
return False
|
||
except Exception as err:
|
||
logger.error(f"更新Jellyfin媒体项图片失败:{err}")
|
||
return False
|
||
|
||
def __set_plex_item_image():
|
||
"""
|
||
更新Plex媒体项图片
|
||
# FIXME 改为预下载图片
|
||
"""
|
||
try:
|
||
plexitem = Plex().get_plex().library.fetchItem(ekey=itemid)
|
||
plexitem.uploadPoster(url=imageurl)
|
||
return True
|
||
except Exception as err:
|
||
logger.error(f"更新Plex媒体项图片失败:{err}")
|
||
return False
|
||
|
||
if server == "emby":
|
||
# 下载图片获取base64
|
||
image_base64 = __download_image()
|
||
if image_base64:
|
||
return __set_emby_item_image(image_base64)
|
||
elif server == "jellyfin":
|
||
return __set_jellyfin_item_image()
|
||
else:
|
||
return __set_plex_item_image()
|
||
return None
|
||
|
||
@staticmethod
|
||
def __get_chinese_name(personinfo: dict) -> str:
|
||
"""
|
||
获取TMDB别名中的中文名
|
||
"""
|
||
try:
|
||
also_known_as = personinfo.get("also_known_as") or []
|
||
if also_known_as:
|
||
for name in also_known_as:
|
||
if name and StringUtils.is_chinese(name):
|
||
# 使用cn2an将繁体转化为简体
|
||
return zhconv.convert(name, "zh-hans")
|
||
except Exception as err:
|
||
logger.error(f"获取人物中文名失败:{err}")
|
||
return ""
|
||
|
||
def stop_service(self):
|
||
"""
|
||
停止服务
|
||
"""
|
||
try:
|
||
if self._scheduler:
|
||
self._scheduler.remove_all_jobs()
|
||
if self._scheduler.running:
|
||
self._event.set()
|
||
self._scheduler.shutdown()
|
||
self._event.clear()
|
||
self._scheduler = None
|
||
except Exception as e:
|
||
print(str(e))
|