Merge remote-tracking branch 'origin/main'

This commit is contained in:
jxxghp
2023-07-22 18:03:07 +08:00
28 changed files with 917 additions and 245 deletions

View File

@ -181,4 +181,4 @@ class DoubanScraper:
return
xml_str = doc.toprettyxml(indent=" ", encoding="utf-8")
file_path.write_bytes(xml_str)
logger.info(f"NFO文件已保存{file_path}")
logger.info(f"NFO文件已保存{file_path}")

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import Optional, Tuple, Union, Any
from typing import Optional, Tuple, Union, Any, List, Generator
from app import schemas
from app.core.context import MediaInfo
@ -97,3 +97,50 @@ class EmbyModule(_ModuleBase):
episode_count=media_statistic.get("EpisodeCount") or 0,
user_count=user_count or 0
)
def mediaserver_librarys(self) -> List[schemas.MediaServerLibrary]:
"""
媒体库列表
"""
librarys = self.emby.get_librarys()
if not librarys:
return []
return [schemas.MediaServerLibrary(
server="emby",
id=library.get("id"),
name=library.get("name"),
type=library.get("type"),
path=library.get("path")
) for library in librarys]
def mediaserver_items(self, library_id: str) -> Generator:
"""
媒体库项目列表
"""
items = self.emby.get_items(library_id)
for item in items:
yield schemas.MediaServerItem(
server="emby",
library=item.get("library"),
item_id=item.get("id"),
item_type=item.get("type"),
title=item.get("title"),
original_title=item.get("original_title"),
year=item.get("year"),
tmdbid=item.get("tmdbid"),
imdbid=item.get("imdbid"),
tvdbid=item.get("tvdbid"),
path=item.get("path"),
)
def mediaserver_tv_episodes(self, item_id: Union[str, int]) -> List[schemas.MediaServerSeasonInfo]:
"""
获取剧集信息
"""
seasoninfo = self.emby.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [schemas.MediaServerSeasonInfo(
season=season,
episodes=episodes
) for season, episodes in seasoninfo.items()]

View File

@ -1,7 +1,7 @@
import json
import re
from pathlib import Path
from typing import List, Optional, Union, Dict
from typing import List, Optional, Union, Dict, Generator
from app.core.config import settings
from app.log import logger
@ -43,7 +43,7 @@ class Emby(metaclass=Singleton):
logger.error(f"连接Library/SelectableMediaFolders 出错:" + str(e))
return []
def get_emby_librarys(self) -> List[dict]:
def __get_emby_librarys(self) -> List[dict]:
"""
获取Emby媒体库列表
"""
@ -61,6 +61,29 @@ class Emby(metaclass=Singleton):
logger.error(f"连接User/Views 出错:" + str(e))
return []
def get_librarys(self):
"""
获取媒体服务器所有媒体库列表
"""
if not self._host or not self._apikey:
return []
libraries = []
for library in self.__get_emby_librarys() or []:
match library.get("CollectionType"):
case "movies":
library_type = MediaType.MOVIE.value
case "tvshows":
library_type = MediaType.TV.value
case _:
continue
libraries.append({
"id": library.get("Id"),
"name": library.get("Name"),
"path": library.get("Path"),
"type": library_type
})
return libraries
def get_user(self, user_name: str = None) -> Optional[Union[str, int]]:
"""
获得管理员用户
@ -269,12 +292,14 @@ class Emby(metaclass=Singleton):
return []
def get_tv_episodes(self,
item_id: str = None,
title: str = None,
year: str = None,
tmdb_id: int = None,
season: int = None) -> Optional[Dict[int, list]]:
"""
根据标题和年份和季返回Emby中的剧集列表
:param item_id: Emby中的ID
:param title: 标题
:param year: 年份
:param tmdb_id: TMDBID
@ -284,16 +309,17 @@ class Emby(metaclass=Singleton):
if not self._host or not self._apikey:
return None
# 电视剧
item_id = self.__get_emby_series_id_by_name(title, year)
if item_id is None:
return None
if not item_id:
return {}
# 验证tmdbid是否相同
item_tmdbid = self.get_iteminfo(item_id).get("ProviderIds", {}).get("Tmdb")
if tmdb_id and item_tmdbid:
if str(tmdb_id) != str(item_tmdbid):
item_id = self.__get_emby_series_id_by_name(title, year)
if item_id is None:
return None
if not item_id:
return {}
# 验证tmdbid是否相同
item_tmdbid = self.get_iteminfo(item_id).get("ProviderIds", {}).get("Tmdb")
if tmdb_id and item_tmdbid:
if str(tmdb_id) != str(item_tmdbid):
return {}
# /Shows/Id/Episodes 查集的信息
if not season:
season = ""
@ -475,6 +501,42 @@ class Emby(metaclass=Singleton):
logger.error(f"连接Items/Id出错" + str(e))
return {}
def get_items(self, parent: str) -> Generator:
"""
获取媒体服务器所有媒体库列表
"""
if not parent:
yield {}
if not self._host or not self._apikey:
yield {}
req_url = "%semby/Users/%s/Items?ParentId=%s&api_key=%s" % (self._host, self._user, parent, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code == 200:
results = res.json().get("Items") or []
for result in results:
if not result:
continue
if result.get("Type") in ["Movie", "Series"]:
item_info = self.get_iteminfo(result.get("Id"))
yield {"id": result.get("Id"),
"library": item_info.get("ParentId"),
"type": item_info.get("Type"),
"title": item_info.get("Name"),
"original_title": item_info.get("OriginalTitle"),
"year": item_info.get("ProductionYear"),
"tmdbid": item_info.get("ProviderIds", {}).get("Tmdb"),
"imdbid": item_info.get("ProviderIds", {}).get("Imdb"),
"tvdbid": item_info.get("ProviderIds", {}).get("Tvdb"),
"path": item_info.get("Path"),
"json": str(item_info)}
elif "Folder" in result.get("Type"):
for item in self.get_items(parent=result.get('Id')):
yield item
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
yield {}
def get_webhook_message(self, message_str: str) -> dict:
"""
解析Emby Webhook报文

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Optional, Tuple, Union, Any
from typing import Optional, Tuple, Union, Any, List, Generator
from app import schemas
from app.core.context import MediaInfo
@ -89,3 +89,50 @@ class JellyfinModule(_ModuleBase):
episode_count=media_statistic.get("EpisodeCount") or 0,
user_count=user_count or 0
)
def mediaserver_librarys(self) -> List[schemas.MediaServerLibrary]:
"""
媒体库列表
"""
librarys = self.jellyfin.get_librarys()
if not librarys:
return []
return [schemas.MediaServerLibrary(
server="jellyfin",
id=library.get("id"),
name=library.get("name"),
type=library.get("type"),
path=library.get("path")
) for library in librarys]
def mediaserver_items(self, library_id: str) -> Generator:
"""
媒体库项目列表
"""
items = self.jellyfin.get_items(library_id)
for item in items:
yield schemas.MediaServerItem(
server="jellyfin",
library=item.get("library"),
item_id=item.get("id"),
item_type=item.get("type"),
title=item.get("title"),
original_title=item.get("original_title"),
year=item.get("year"),
tmdbid=item.get("tmdbid"),
imdbid=item.get("imdbid"),
tvdbid=item.get("tvdbid"),
path=item.get("path"),
)
def mediaserver_tv_episodes(self, item_id: Union[str, int]) -> List[schemas.MediaServerSeasonInfo]:
"""
获取剧集信息
"""
seasoninfo = self.jellyfin.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [schemas.MediaServerSeasonInfo(
season=season,
episodes=episodes
) for season, episodes in seasoninfo.items()]

View File

@ -1,9 +1,10 @@
import json
import re
from typing import List, Union, Optional, Dict
from typing import List, Union, Optional, Dict, Generator
from app.core.config import settings
from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
@ -22,7 +23,7 @@ class Jellyfin(metaclass=Singleton):
self._user = self.get_user()
self._serverid = self.get_server_id()
def get_jellyfin_librarys(self) -> List[dict]:
def __get_jellyfin_librarys(self) -> List[dict]:
"""
获取Jellyfin媒体库的信息
"""
@ -40,6 +41,29 @@ class Jellyfin(metaclass=Singleton):
logger.error(f"连接Users/Views 出错:" + str(e))
return []
def get_librarys(self):
"""
获取媒体服务器所有媒体库列表
"""
if not self._host or not self._apikey:
return []
libraries = []
for library in self.__get_jellyfin_librarys() or []:
match library.get("CollectionType"):
case "movies":
library_type = MediaType.MOVIE.value
case "tvshows":
library_type = MediaType.TV.value
case _:
continue
libraries.append({
"id": library.get("Id"),
"name": library.get("Name"),
"path": library.get("Path"),
"type": library_type
})
return libraries
def get_user_count(self) -> int:
"""
获得用户数量
@ -243,12 +267,14 @@ class Jellyfin(metaclass=Singleton):
return []
def get_tv_episodes(self,
item_id: str = None,
title: str = None,
year: str = None,
tmdb_id: int = None,
season: int = None) -> Optional[Dict[str, list]]:
season: int = None) -> Optional[Dict[int, list]]:
"""
根据标题和年份和季返回Jellyfin中的剧集列表
:param item_id: Jellyfin中的Id
:param title: 标题
:param year: 年份
:param tmdb_id: TMDBID
@ -258,16 +284,17 @@ class Jellyfin(metaclass=Singleton):
if not self._host or not self._apikey or not self._user:
return None
# 查TVID
item_id = self.__get_jellyfin_series_id_by_name(title, year)
if item_id is None:
return None
if not item_id:
return {}
# 验证tmdbid是否相同
item_tmdbid = self.get_iteminfo(item_id).get("ProviderIds", {}).get("Tmdb")
if tmdb_id and item_tmdbid:
if str(tmdb_id) != str(item_tmdbid):
item_id = self.__get_jellyfin_series_id_by_name(title, year)
if item_id is None:
return None
if not item_id:
return {}
# 验证tmdbid是否相同
item_tmdbid = self.get_iteminfo(item_id).get("ProviderIds", {}).get("Tmdb")
if tmdb_id and item_tmdbid:
if str(tmdb_id) != str(item_tmdbid):
return {}
if not season:
season = ""
try:
@ -338,6 +365,24 @@ class Jellyfin(metaclass=Singleton):
logger.error(f"连接Library/Refresh出错" + str(e))
return False
def get_webhook_message(self, message: dict) -> dict:
"""
解析Jellyfin报文
"""
eventItem = {'event': message.get('NotificationType', ''),
'item_name': message.get('Name'),
'user_name': message.get('NotificationUsername'),
"channel": "jellyfin"
}
# 获取消息图片
if eventItem.get("item_id"):
# 根据返回的item_id去调用媒体服务器获取
eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'),
image_type="Backdrop")
return eventItem
def get_iteminfo(self, itemid: str) -> dict:
"""
获取单个项目详情
@ -356,20 +401,38 @@ class Jellyfin(metaclass=Singleton):
logger.error(f"连接Users/Items出错" + str(e))
return {}
def get_webhook_message(self, message: dict) -> dict:
def get_items(self, parent: str) -> Generator:
"""
解析Jellyfin报文
获取媒体服务器所有媒体库列表
"""
eventItem = {'event': message.get('NotificationType', ''),
'item_name': message.get('Name'),
'user_name': message.get('NotificationUsername'),
"channel": "jellyfin"
}
# 获取消息图片
if eventItem.get("item_id"):
# 根据返回的item_id去调用媒体服务器获取
eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'),
image_type="Backdrop")
return eventItem
if not parent:
yield {}
if not self._host or not self._apikey:
yield {}
req_url = "%sUsers/%s/Items?parentId=%s&api_key=%s" % (self._host, self._user, parent, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code == 200:
results = res.json().get("Items") or []
for result in results:
if not result:
continue
if result.get("Type") in ["Movie", "Series"]:
item_info = self.get_iteminfo(result.get("Id"))
yield {"id": result.get("Id"),
"library": item_info.get("ParentId"),
"type": item_info.get("Type"),
"title": item_info.get("Name"),
"original_title": item_info.get("OriginalTitle"),
"year": item_info.get("ProductionYear"),
"tmdbid": item_info.get("ProviderIds", {}).get("Tmdb"),
"imdbid": item_info.get("ProviderIds", {}).get("Imdb"),
"tvdbid": item_info.get("ProviderIds", {}).get("Tvdb"),
"path": item_info.get("Path"),
"json": str(item_info)}
elif "Folder" in result.get("Type"):
for item in self.get_items(result.get("Id")):
yield item
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
yield {}

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import Optional, Tuple, Union, Any
from typing import Optional, Tuple, Union, Any, List, Generator
from app import schemas
from app.core.context import MediaInfo
@ -86,3 +86,50 @@ class PlexModule(_ModuleBase):
episode_count=media_statistic.get("EpisodeCount") or 0,
user_count=1
)
def mediaserver_librarys(self) -> List[schemas.MediaServerLibrary]:
"""
媒体库列表
"""
librarys = self.plex.get_librarys()
if not librarys:
return []
return [schemas.MediaServerLibrary(
server="plex",
id=library.get("id"),
name=library.get("name"),
type=library.get("type"),
path=library.get("path")
) for library in librarys]
def mediaserver_items(self, library_id: str) -> Generator:
"""
媒体库项目列表
"""
items = self.plex.get_items(library_id)
for item in items:
yield schemas.MediaServerItem(
server="plex",
library=item.get("library"),
item_id=item.get("id"),
item_type=item.get("type"),
title=item.get("title"),
original_title=item.get("original_title"),
year=item.get("year"),
tmdbid=item.get("tmdbid"),
imdbid=item.get("imdbid"),
tvdbid=item.get("tvdbid"),
path=item.get("path"),
)
def mediaserver_tv_episodes(self, item_id: Union[str, int]) -> List[schemas.MediaServerSeasonInfo]:
"""
获取剧集信息
"""
seasoninfo = self.plex.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [schemas.MediaServerSeasonInfo(
season=season,
episodes=episodes
) for season, episodes in seasoninfo.items()]

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import List, Optional, Dict, Tuple
from typing import List, Optional, Dict, Tuple, Generator
from urllib.parse import quote_plus
from plexapi import media
@ -8,7 +8,7 @@ from plexapi.server import PlexServer
from app.core.config import settings
from app.log import logger
from app.schemas import RefreshMediaItem
from app.schemas import RefreshMediaItem, MediaType
from app.utils.singleton import Singleton
@ -30,6 +30,34 @@ class Plex(metaclass=Singleton):
self._plex = None
logger.error(f"Plex服务器连接失败{str(e)}")
def get_librarys(self):
"""
获取媒体服务器所有媒体库列表
"""
if not self._plex:
return []
try:
self._libraries = self._plex.library.sections()
except Exception as err:
logger.error(f"获取媒体服务器所有媒体库列表出错:{str(err)}")
return []
libraries = []
for library in self._libraries:
match library.type:
case "movie":
library_type = MediaType.MOVIE.value
case "show":
library_type = MediaType.TV.value
case _:
continue
libraries.append({
"id": library.key,
"name": library.title,
"path": library.locations,
"type": library_type
})
return libraries
def get_activity_log(self, num: int = 30) -> Optional[List[dict]]:
"""
获取Plex活动记录
@ -111,11 +139,13 @@ class Plex(metaclass=Singleton):
return ret_movies
def get_tv_episodes(self,
item_id: str = None,
title: str = None,
year: str = None,
season: int = None) -> Optional[Dict[str, list]]:
season: int = None) -> Optional[Dict[int, list]]:
"""
根据标题、年份、季查询电视剧所有集信息
:param item_id: 媒体ID
:param title: 标题
:param year: 年份,可以为空,为空时不按年份过滤
:param season: 季号,数字
@ -123,7 +153,10 @@ class Plex(metaclass=Singleton):
"""
if not self._plex:
return {}
videos = self._plex.library.search(title=title, year=year, libtype="show")
if item_id:
videos = self._plex.library.sectionByID(item_id).all()
else:
videos = self._plex.library.search(title=title, year=year, libtype="show")
if not videos:
return {}
episodes = videos[0].episodes()
@ -252,6 +285,38 @@ class Plex(metaclass=Singleton):
break
return ids
def get_items(self, parent: str) -> Generator:
"""
获取媒体服务器所有媒体库列表
"""
if not parent:
yield {}
if not self._plex:
yield {}
try:
section = self._plex.library.sectionByID(parent)
if section:
for item in section.all():
if not item:
continue
ids = self.__get_ids(item.guids)
path = None
if item.locations:
path = item.locations[0]
yield {"id": item.key,
"library": item.librarySectionID,
"type": item.type,
"title": item.title,
"original_title": item.originalTitle,
"year": item.year,
"tmdbid": ids['tmdb_id'],
"imdbid": ids['imdb_id'],
"tvdbid": ids['tvdb_id'],
"path": path}
except Exception as err:
logger.error(f"获取媒体库列表出错:{err}")
yield {}
def get_webhook_message(self, message_str: str) -> dict:
"""
解析Plex报文