feat:mediaserver apis

This commit is contained in:
jxxghp 2024-01-02 20:54:54 +08:00
parent 4d51459a47
commit 322c72ab54
8 changed files with 356 additions and 0 deletions

View File

@ -44,6 +44,18 @@ class MediaServerChain(ChainBase):
"""
return self.run_module("mediaserver_tv_episodes", server=server, item_id=item_id)
def playing(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
return self.run_module("mediaserver_playing", server=server, count=count)
def latest(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
return self.run_module("mediaserver_latest", server=server, count=count)
def sync(self):
"""
同步媒体库所有数据到本地数据库

View File

@ -141,3 +141,19 @@ class EmbyModule(_ModuleBase):
season=season,
episodes=episodes
) for season, episodes in seasoninfo.items()]
def mediaserver_playing(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
if server != "emby":
return []
return self.emby.get_resume(count)
def mediaserver_latest(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
if server != "emby":
return []
return self.emby.get_latest(count)

View File

@ -25,6 +25,7 @@ class Emby(metaclass=Singleton):
self._apikey = settings.EMBY_API_KEY
self.user = self.get_user(settings.SUPERUSER)
self.folders = self.get_emby_folders()
self.serverid = self.get_server_id()
def is_inactive(self) -> bool:
"""
@ -907,3 +908,118 @@ class Emby(metaclass=Singleton):
except Exception as e:
logger.error(f"连接Emby出错" + str(e))
return None
def __get_play_url(self, item_id: str) -> str:
"""
拼装媒体播放链接
:param item_id: 媒体的的ID
"""
return f"{self._host}web/index.html#!/item?id={item_id}&context=home&serverId={self.serverid}"
def __get_backdrop_url(self, item_id: str, image_tag: str) -> str:
"""
获取Emby的Backdrop图片地址
:param: item_id: 在Emby中的ID
:param: image_tag: 图片的tag
:param: remote 是否远程使用TG微信等客户端调用应为True
:param: inner 是否NT内部调用为True是会使用NT中转
"""
if not self._host or not self._apikey:
return ""
if not image_tag or not item_id:
return ""
return f"{self._host}Items/{item_id}/" \
f"Images/Backdrop?tag={image_tag}&fillWidth=666&api_key={self._apikey}"
def __get_local_image_by_id(self, item_id: str) -> str:
"""
根据ItemId从媒体服务器查询本地图片地址
:param: item_id: 在Emby中的ID
:param: remote 是否远程使用TG微信等客户端调用应为True
:param: inner 是否NT内部调用为True是会使用NT中转
"""
if not self._host or not self._apikey:
return ""
return "%sItems/%s/Images/Primary" % (self._host, item_id)
def get_resume(self, num: int = 12) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获得继续观看
"""
if not self._host or not self._apikey:
return None
req_url = f"{self._host}Users/{self.user}/Items/Resume?Limit={num}&MediaTypes=Video&api_key={self._apikey}"
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json().get("Items") or []
ret_resume = []
for item in result:
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.__get_play_url(item.get("Id"))
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
else:
if item.get("ParentIndexNumber") == 1:
title = f'{item.get("SeriesName")}{item.get("IndexNumber")}'
else:
title = f'{item.get("SeriesName")}{item.get("ParentIndexNumber")}季第{item.get("IndexNumber")}'
if item_type == MediaType.MOVIE.value:
if item.get("BackdropImageTags"):
image = self.__get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
else:
image = self.__get_local_image_by_id(item.get("Id"))
else:
image = self.__get_backdrop_url(item_id=item.get("SeriesId"),
image_tag=item.get("SeriesPrimaryImageTag"))
if not image:
image = self.__get_local_image_by_id(item.get("SeriesId"))
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
name=title,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
))
return ret_resume
else:
logger.error(f"Users/Items/Resume 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Resume出错" + str(e))
return []
def get_latest(self, num: int = 20) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获得最近更新
"""
if not self._host or not self._apikey:
return None
req_url = f"{self._host}Users/{self.user}/Items/Latest?Limit={num}&MediaTypes=Video&api_key={self._apikey}"
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json() or []
ret_latest = []
for item in result:
if item.get("Type") not in ["Movie", "Series"]:
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.__get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
name=item.get("Name"),
type=item_type,
image=image,
link=link
))
return ret_latest
else:
logger.error(f"Users/Items/Latest 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Latest出错" + str(e))
return []

View File

@ -139,3 +139,19 @@ class JellyfinModule(_ModuleBase):
season=season,
episodes=episodes
) for season, episodes in seasoninfo.items()]
def mediaserver_playing(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
if server != "jellyfin":
return []
return self.jellyfin.get_resume(count)
def mediaserver_latest(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
if server != "jellyfin":
return []
return self.jellyfin.get_latest(count)

View File

@ -587,3 +587,112 @@ class Jellyfin(metaclass=Singleton):
except Exception as e:
logger.error(f"连接Jellyfin出错" + str(e))
return None
def __get_play_url(self, item_id: str) -> str:
"""
拼装媒体播放链接
:param item_id: 媒体的的ID
"""
return f"{self._host}web/index.html#!/details?id={item_id}&serverId={self.serverid}"
def __get_local_image_by_id(self, item_id: str) -> str:
"""
根据ItemId从媒体服务器查询有声书图片地址
:param: item_id: 在Emby中的ID
:param: remote 是否远程使用TG微信等客户端调用应为True
:param: inner 是否NT内部调用为True是会使用NT中转
"""
if not self._host or not self._apikey:
return ""
return "%sItems/%s/Images/Primary" % (self._host, item_id)
def __get_backdrop_url(self, item_id: str, image_tag: str) -> str:
"""
获取Backdrop图片地址
:param: item_id: 在Emby中的ID
:param: image_tag: 图片的tag
:param: remote 是否远程使用TG微信等客户端调用应为True
:param: inner 是否NT内部调用为True是会使用NT中转
"""
if not self._host or not self._apikey:
return ""
if not image_tag or not item_id:
return ""
return f"{self._host}Items/{item_id}/" \
f"Images/Backdrop?tag={image_tag}&fillWidth=666&api_key={self._apikey}"
def get_resume(self, num: int = 12) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获得继续观看
"""
if not self._host or not self._apikey:
return None
req_url = f"{self._host}Users/{self.user}/Items/Resume?Limit={num}&MediaTypes=Video&api_key={self._apikey}"
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json().get("Items") or []
ret_resume = []
for item in result:
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.__get_play_url(item.get("Id"))
if item.get("BackdropImageTags"):
image = self.__get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
else:
image = self.__get_local_image_by_id(item.get("Id"))
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
else:
if item.get("ParentIndexNumber") == 1:
title = f'{item.get("SeriesName")}{item.get("IndexNumber")}'
else:
title = f'{item.get("SeriesName")}{item.get("ParentIndexNumber")}季第{item.get("IndexNumber")}'
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
name=title,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
))
return ret_resume
else:
logger.error(f"Users/Items/Resume 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Resume出错" + str(e))
return []
def get_latest(self, num=20) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获得最近更新
"""
if not self._host or not self._apikey:
return None
req_url = f"{self._host}Users/{self.user}/Items/Latest?Limit={num}&MediaTypes=Video&api_key={self._apikey}"
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json() or []
ret_latest = []
for item in result:
if item.get("Type") not in ["Movie", "Series"]:
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.__get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
name=item.get("Name"),
type=item_type,
image=image,
link=link
))
return ret_latest
else:
logger.error(f"Users/Items/Latest 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Latest出错" + str(e))
return []

View File

@ -133,3 +133,19 @@ class PlexModule(_ModuleBase):
season=season,
episodes=episodes
) for season, episodes in seasoninfo.items()]
def mediaserver_playing(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
if server != "plex":
return []
return self.plex.get_resume(count)
def mediaserver_latest(self, server: str, count: int = 20) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
if server != "plex":
return []
return self.plex.get_latest(count)

View File

@ -543,3 +543,62 @@ class Plex(metaclass=Singleton):
获取plex对象以便直接操作
"""
return self._plex
def __get_play_url(self, item_id: str) -> str:
"""
拼装媒体播放链接
:param item_id: 媒体的的ID
"""
return f'{self._host}#!/server/{self._plex.machineIdentifier}/details?key={item_id}'
def get_resume(self, num: int = 12) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获取继续观看的媒体
"""
if not self._plex:
return []
items = self._plex.fetchItems('/hubs/continueWatching/items', container_start=0, container_size=num)
ret_resume = []
for item in items:
item_type = MediaType.MOVIE.value if item.TYPE == "movie" else MediaType.TV.value
if item_type == MediaType.MOVIE.value:
name = item.title
else:
if item.parentIndex == 1:
name = "%s%s" % (item.grandparentTitle, item.index)
else:
name = "%s%s季第%s" % (item.grandparentTitle, item.parentIndex, item.index)
link = self.__get_play_url(item.key)
image = item.artUrl
ret_resume.append(schemas.MediaServerPlayItem(
id=item.key,
name=name,
type=item_type,
image=image,
link=link,
percent=item.viewOffset / item.duration * 100 if item.viewOffset and item.duration else 0
))
return ret_resume
def get_latest(self, num: int = 20) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获取最近添加媒体
"""
if not self._plex:
return None
items = self._plex.fetchItems('/library/recentlyAdded', container_start=0, container_size=num)
ret_resume = []
for item in items:
item_type = MediaType.MOVIE.value if item.TYPE == "movie" else MediaType.TV.value
link = self.__get_play_url(item.key)
title = item.title if item_type == MediaType.MOVIE.value else \
"%s%s" % (item.parentTitle, item.index)
image = item.posterUrl
ret_resume.append(schemas.MediaServerPlayItem(
id=item.key,
name=title,
type=item_type,
image=image,
link=link
))
return ret_resume

View File

@ -139,3 +139,15 @@ class WebhookEventInfo(BaseModel):
save_reason: Optional[str] = None
item_isvirtual: Optional[bool] = None
media_type: Optional[str] = None
class MediaServerPlayItem(BaseModel):
"""
媒体服务器可播放项目信息
"""
id: Optional[Union[str, int]] = None
name: Optional[str] = None
type: Optional[str] = None
image: Optional[str] = None
link: Optional[str] = None
percent: Optional[float] = None