Merge pull request #2526 from InfinityPacer/main

This commit is contained in:
jxxghp 2024-07-07 16:49:11 +08:00 committed by GitHub
commit b892ef50dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 89 additions and 18 deletions

View File

@ -265,25 +265,59 @@ class Plex:
season_episodes[episode.seasonNumber].append(episode.index) season_episodes[episode.seasonNumber].append(episode.index)
return videos.key, season_episodes return videos.key, season_episodes
def get_remote_image_by_id(self, item_id: str, image_type: str) -> Optional[str]: def get_remote_image_by_id(self, item_id: str, image_type: str, depth: int = 0) -> Optional[str]:
""" """
根据ItemId从Plex查询图片地址 根据ItemId从Plex查询图片地址
:param item_id: Emby中的ID :param item_id: Plex中的ID
:param image_type: 图片的类型Poster或者Backdrop等 :param image_type: 图片的类型Poster或者Backdrop等
:param depth: 当前递归深度默认为0
:return: 图片对应在TMDB中的URL :return: 图片对应在TMDB中的URL
""" """
if not self._plex: if not self._plex or depth > 2 or not item_id:
return None return None
try: try:
image_url = None
ekey = f"/library/metadata/{item_id}"
item = self._plex.fetchItem(ekey=ekey)
if not item:
return None
# 如果配置了外网播放地址以及Token则默认从Plex媒体服务器获取图片否则返回有外网地址的图片资源
if settings.PLEX_PLAY_HOST and settings.PLEX_TOKEN:
query = {"X-Plex-Token": settings.PLEX_TOKEN}
if image_type == "Poster": if image_type == "Poster":
images = self._plex.fetchItems('/library/metadata/%s/posters' % item_id, if item.thumb:
image_url = RequestUtils.combine_url(host=settings.PLEX_PLAY_HOST, path=item.thumb, query=query)
else:
# 这里对episode进行特殊处理实际上episode的Backdrop是Poster
if item.TYPE == "episode" and item.thumb:
image_url = RequestUtils.combine_url(host=settings.PLEX_PLAY_HOST, path=item.thumb, query=query)
# 默认使用art也就是Backdrop进行处理
if not image_url and item.art:
image_url = RequestUtils.combine_url(host=settings.PLEX_PLAY_HOST, path=item.art, query=query)
else:
if image_type == "Poster":
images = self._plex.fetchItems(ekey=f"{ekey}/posters",
cls=media.Poster) cls=media.Poster)
else: else:
images = self._plex.fetchItems('/library/metadata/%s/arts' % item_id, # 这里对episode进行特殊处理实际上episode的Backdrop是Poster
images = None
if item.TYPE == "episode":
images = self._plex.fetchItems(ekey=f"{ekey}/posters",
cls=media.Poster)
# 默认使用art也就是Backdrop进行处理
if not images:
images = self._plex.fetchItems(ekey=f"{ekey}/arts",
cls=media.Art) cls=media.Art)
for image in images: for image in images:
if hasattr(image, 'key') and image.key.startswith('http'): if hasattr(image, "key") and image.key.startswith("http"):
return image.key image_url = image.key
break
# 如果最后还是找不到,则递归父级进行查找
if not image_url and hasattr(item, "parentRatingKey"):
return self.get_remote_image_by_id(item_id=item.parentRatingKey,
image_type=image_type,
depth=depth + 1)
return image_url
except Exception as e: except Exception as e:
logger.error(f"获取封面出错:" + str(e)) logger.error(f"获取封面出错:" + str(e))
return None return None

View File

@ -1,5 +1,6 @@
import re import re
import threading import threading
import uuid
from pathlib import Path from pathlib import Path
from threading import Event from threading import Event
from typing import Optional, List, Dict from typing import Optional, List, Dict
@ -199,14 +200,17 @@ class Telegram:
""" """
if image: if image:
req = RequestUtils(proxies=settings.PROXY).get_res(image) res = RequestUtils(proxies=settings.PROXY).get_res(image)
if req is None: if res is None:
raise Exception("获取图片失败") raise Exception("获取图片失败")
if req.content: if res.content:
image_file = Path(settings.TEMP_PATH) / Path(image).name # 使用随机标识构建图片文件的完整路径,并写入图片内容到文件
image_file.write_bytes(req.content) image_file = Path(settings.TEMP_PATH) / str(uuid.uuid4())
image_file.write_bytes(res.content)
photo = InputFile(image_file) photo = InputFile(image_file)
caption = re.sub(r'([_`])', r'\\\1', caption) # 对caption进行Markdown特殊字符转义
caption = re.sub(r"([_`])", r"\\\1", caption)
# 发送图片到Telegram
ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id, ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id,
photo=photo, photo=photo,
caption=caption, caption=caption,

View File

@ -1,5 +1,5 @@
from typing import Union, Any, Optional from typing import Union, Any, Optional
from urllib.parse import urljoin from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
import requests import requests
import urllib3 import urllib3
@ -255,3 +255,36 @@ class RequestUtils:
return endpoint return endpoint
host = RequestUtils.standardize_base_url(host) host = RequestUtils.standardize_base_url(host)
return urljoin(host, endpoint) if host else endpoint return urljoin(host, endpoint) if host else endpoint
@staticmethod
def combine_url(host: str, path: Optional[str] = None, query: Optional[dict] = None) -> Optional[str]:
"""
使用给定的主机头路径和查询参数组合生成完整的URL
:param host: str, 主机头例如 https://example.com
:param path: Optional[str], 包含路径和可能已经包含的查询参数的端点例如 /path/to/resource?current=1
:param query: Optional[dict], 可选额外的查询参数例如 {"key": "value"}
:return: str, 完整的请求URL字符串
"""
try:
# 如果路径为空,则默认为 '/'
if path is None:
path = '/'
# 使用 urljoin 合并 host 和 path
url = urljoin(host, path)
# 解析当前 URL 的组成部分
url_parts = urlparse(url)
# 解析已存在的查询参数,并与额外的查询参数合并
query_params = parse_qs(url_parts.query)
if query:
for key, value in query.items():
query_params[key] = value
# 重新构建查询字符串
query_string = urlencode(query_params, doseq=True)
# 构建完整的 URL
new_url_parts = url_parts._replace(query=query_string)
complete_url = urlunparse(new_url_parts)
return str(complete_url)
except Exception as e:
logger.debug(f"Error combining URL: {e}")
return None