add webhook schemas

This commit is contained in:
jxxghp
2023-07-27 08:21:41 +08:00
parent 7699d4fc1b
commit 4039accf6d
9 changed files with 122 additions and 98 deletions

View File

@ -12,7 +12,8 @@ from app.core.event import EventManager
from app.core.meta import MetaBase
from app.core.module import ModuleManager
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
WebhookEventInfo
from app.schemas.types import TorrentStatus, MediaType, MediaImageType
from app.utils.object import ObjectUtils
from app.utils.singleton import AbstractSingleton, Singleton
@ -170,7 +171,7 @@ class ChainBase(AbstractSingleton, metaclass=Singleton):
"""
return self.run_module("message_parser", body=body, form=form, args=args)
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[dict]:
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[WebhookEventInfo]:
"""
解析Webhook报文体
:param body: 请求体

View File

@ -3,8 +3,8 @@ from typing import Any
from app.chain import ChainBase
from app.schemas import Notification
from app.utils.http import WebUtils
from app.schemas.types import EventType, MediaImageType, MediaType, NotificationType
from app.utils.http import WebUtils
class WebhookChain(ChainBase):
@ -17,7 +17,7 @@ class WebhookChain(ChainBase):
处理Webhook报文并发送消息
"""
# 获取主体内容
event_info: dict = self.webhook_parser(body=body, form=form, args=args)
event_info = self.webhook_parser(body=body, form=form, args=args)
if not event_info:
return
# 广播事件
@ -42,55 +42,55 @@ class WebhookChain(ChainBase):
"jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi"
}
if not _webhook_actions.get(event_info.get('event')):
if not _webhook_actions.get(event_info.event):
return
# 消息标题
if event_info.get('item_type') in ["TV", "SHOW"]:
message_title = f"{_webhook_actions.get(event_info.get('event'))}剧集 {event_info.get('item_name')}"
elif event_info.get('item_type') == "MOV":
message_title = f"{_webhook_actions.get(event_info.get('event'))}电影 {event_info.get('item_name')}"
elif event_info.get('item_type') == "AUD":
message_title = f"{_webhook_actions.get(event_info.get('event'))}有声书 {event_info.get('item_name')}"
if event_info.item_type in ["TV", "SHOW"]:
message_title = f"{_webhook_actions.get(event_info.event)}剧集 {event_info.item_name}"
elif event_info.item_type == "MOV":
message_title = f"{_webhook_actions.get(event_info.event)}电影 {event_info.item_name}"
elif event_info.item_type == "AUD":
message_title = f"{_webhook_actions.get(event_info.event)}有声书 {event_info.item_name}"
else:
message_title = f"{_webhook_actions.get(event_info.get('event'))}"
message_title = f"{_webhook_actions.get(event_info.event)}"
# 消息内容
message_texts = []
if event_info.get('user_name'):
message_texts.append(f"用户:{event_info.get('user_name')}")
if event_info.get('device_name'):
message_texts.append(f"设备:{event_info.get('client')} {event_info.get('device_name')}")
if event_info.get('ip'):
message_texts.append(f"IP地址{event_info.get('ip')} {WebUtils.get_location(event_info.get('ip'))}")
if event_info.get('percentage'):
percentage = round(float(event_info.get('percentage')), 2)
if event_info.user_name:
message_texts.append(f"用户:{event_info.user_name}")
if event_info.device_name:
message_texts.append(f"设备:{event_info.client} {event_info.device_name}")
if event_info.ip:
message_texts.append(f"IP地址{event_info.ip} {WebUtils.get_location(event_info.ip)}")
if event_info.percentage:
percentage = round(float(event_info.percentage), 2)
message_texts.append(f"进度:{percentage}%")
if event_info.get('overview'):
message_texts.append(f"剧情:{event_info.get('overview')}")
if event_info.overview:
message_texts.append(f"剧情:{event_info.overview}")
message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}")
# 消息内容
message_content = "\n".join(message_texts)
# 消息图片
image_url = event_info.get("image_url")
image_url = event_info.image_url
# 查询剧集图片
if event_info.get("tmdb_id") \
and event_info.get("season_id"):
mtype = MediaType.TV if event_info.get("item_type") == "TV" else MediaType.MOVIE
if event_info.tmdb_id \
and event_info.season_id:
mtype = MediaType.TV if event_info.item_type == "TV" else MediaType.MOVIE
specific_image = self.obtain_specific_image(
mediaid=event_info.get("tmdb_id"),
mediaid=event_info.tmdb_id,
mtype=mtype,
image_type=MediaImageType.Backdrop,
season=event_info.get("season_id"),
episode=event_info.get("episode_id")
season=event_info.season_id,
episode=event_info.episode_id
)
if specific_image:
image_url = specific_image
# 使用默认图片
if not image_url:
image_url = _webhook_images.get(event_info.get("channel"))
image_url = _webhook_images.get(event_info.channel)
# 发送消息
self.post_message(Notification(mtype=NotificationType.MediaServer,