diff --git a/app/chain/webhook.py b/app/chain/webhook.py index fc01f788..a158143b 100644 --- a/app/chain/webhook.py +++ b/app/chain/webhook.py @@ -1,11 +1,8 @@ -import time from typing import Any from app.chain import ChainBase -from app.schemas import Notification -from app.schemas.types import EventType, MediaImageType, MediaType, NotificationType +from app.schemas.types import EventType from app.utils.singleton import Singleton -from app.utils.web import WebUtils class WebhookChain(ChainBase, metaclass=Singleton): @@ -15,7 +12,7 @@ class WebhookChain(ChainBase, metaclass=Singleton): def message(self, body: Any, form: Any, args: Any) -> None: """ - 处理Webhook报文并发送消息 + 处理Webhook报文并发送事件 """ # 获取主体内容 event_info = self.webhook_parser(body=body, form=form, args=args) @@ -23,76 +20,3 @@ class WebhookChain(ChainBase, metaclass=Singleton): return # 广播事件 self.eventmanager.send_event(EventType.WebhookMessage, event_info) - # 拼装消息内容 - _webhook_actions = { - "library.new": "新入库", - "system.webhooktest": "测试", - "playback.start": "开始播放", - "playback.stop": "停止播放", - "user.authenticated": "登录成功", - "user.authenticationfailed": "登录失败", - "media.play": "开始播放", - "media.stop": "停止播放", - "PlaybackStart": "开始播放", - "PlaybackStop": "停止播放", - "item.rate": "标记了" - } - _webhook_images = { - "emby": "https://emby.media/notificationicon.png", - "plex": "https://www.plex.tv/wp-content/uploads/2022/04/new-logo-process-lines-gray.png", - "jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi" - } - - if not _webhook_actions.get(event_info.event): - return - - # 消息标题 - if event_info.item_type in ["TV", "SHOW"]: - message_title = f"{_webhook_actions.get(event_info.event)}剧集 {event_info.item_name}" - elif event_info.item_type == "MOV": - message_title = f"{_webhook_actions.get(event_info.event)}电影 {event_info.item_name}" - elif event_info.item_type == "AUD": - message_title = f"{_webhook_actions.get(event_info.event)}有声书 {event_info.item_name}" - else: - message_title = f"{_webhook_actions.get(event_info.event)}" - - # 消息内容 - message_texts = [] - if event_info.user_name: - message_texts.append(f"用户:{event_info.user_name}") - if event_info.device_name: - message_texts.append(f"设备:{event_info.client} {event_info.device_name}") - if event_info.ip: - message_texts.append(f"IP地址:{event_info.ip} {WebUtils.get_location(event_info.ip)}") - if event_info.percentage: - percentage = round(float(event_info.percentage), 2) - message_texts.append(f"进度:{percentage}%") - if event_info.overview: - message_texts.append(f"剧情:{event_info.overview}") - message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}") - - # 消息内容 - message_content = "\n".join(message_texts) - - # 消息图片 - image_url = event_info.image_url - # 查询剧集图片 - if (event_info.tmdb_id - and event_info.season_id - and event_info.episode_id): - specific_image = self.obtain_specific_image( - mediaid=event_info.tmdb_id, - mtype=MediaType.TV, - image_type=MediaImageType.Backdrop, - season=event_info.season_id, - episode=event_info.episode_id - ) - if specific_image: - image_url = specific_image - # 使用默认图片 - if not image_url: - image_url = _webhook_images.get(event_info.channel) - - # 发送消息 - self.post_message(Notification(mtype=NotificationType.MediaServer, - title=message_title, text=message_content, image=image_url)) diff --git a/app/plugins/mediaservermsg/__init__.py b/app/plugins/mediaservermsg/__init__.py new file mode 100644 index 00000000..efbea9d5 --- /dev/null +++ b/app/plugins/mediaservermsg/__init__.py @@ -0,0 +1,223 @@ +import time +from typing import Any, List, Dict, Tuple + +from app.core.event import eventmanager, Event +from app.log import logger +from app.plugins import _PluginBase +from app.schemas import WebhookEventInfo +from app.schemas.types import EventType, MediaType, MediaImageType, NotificationType +from app.utils.web import WebUtils + + +class MediaServerMsg(_PluginBase): + # 插件名称 + plugin_name = "媒体服务器通知" + # 插件描述 + plugin_desc = "发送媒体服务器播入、入库等通知消息。" + # 插件图标 + plugin_icon = "mediaplay.png" + # 主题色 + plugin_color = "#42A3DB" + # 插件版本 + plugin_version = "1.0" + # 插件作者 + plugin_author = "jxxghp" + # 作者主页 + author_url = "https://github.com/jxxghp" + # 插件配置项ID前缀 + plugin_config_prefix = "mediaservermsg_" + # 加载顺序 + plugin_order = 14 + # 可使用的用户级别 + auth_level = 1 + + # 私有属性 + _enabled = False + _types = [] + + # 拼装消息内容 + _webhook_actions = { + "library.new": "新入库", + "system.webhooktest": "测试", + "playback.start": "开始播放", + "playback.stop": "停止播放", + "user.authenticated": "登录成功", + "user.authenticationfailed": "登录失败", + "media.play": "开始播放", + "media.stop": "停止播放", + "PlaybackStart": "开始播放", + "PlaybackStop": "停止播放", + "item.rate": "标记了" + } + _webhook_images = { + "emby": "https://emby.media/notificationicon.png", + "plex": "https://www.plex.tv/wp-content/uploads/2022/04/new-logo-process-lines-gray.png", + "jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi" + } + + def init_plugin(self, config: dict = None): + if config: + self._enabled = config.get("enabled") + self._types = config.get("types") or [] + + def get_state(self) -> bool: + return self._enabled + + @staticmethod + def get_command() -> List[Dict[str, Any]]: + pass + + def get_api(self) -> List[Dict[str, Any]]: + pass + + def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: + """ + 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 + """ + types_options = [ + {"title": "新入库", "value": "library.new"}, + {"title": "开始播放", "value": "playback.start|media.play|PlaybackStart"}, + {"title": "停止播放", "value": "playback.stop|media.stop|PlaybackStop"}, + {"title": "用户标记", "value": "item.rate"}, + {"title": "测试", "value": "system.webhooktest"}, + ] + return [ + { + 'component': 'VForm', + 'content': [ + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 6 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'enabled', + 'label': '启用插件', + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'chips': True, + 'multiple': True, + 'model': 'types', + 'label': '消息类型', + 'items': types_options + } + } + ] + } + ] + }, + ] + } + ], { + "enabled": False, + "types": [] + } + + def get_page(self) -> List[dict]: + pass + + @eventmanager.register(EventType.WebhookMessage) + def send(self, event: Event): + """ + 发送通知消息 + """ + if not self._enabled: + return + + event_info: WebhookEventInfo = event.event_data + if not event_info: + return + + # 不在支持范围不处理 + if not self._webhook_actions.get(event_info.event): + return + + # 不在选中范围不处理 + msgflag = False + for _type in self._types: + if event_info.event in _type.split("|"): + msgflag = True + break + if not msgflag: + logger.info(f"未开启 {event_info.event} 类型的消息通知") + return + + # 消息标题 + if event_info.item_type in ["TV", "SHOW"]: + message_title = f"{self._webhook_actions.get(event_info.event)}剧集 {event_info.item_name}" + elif event_info.item_type == "MOV": + message_title = f"{self._webhook_actions.get(event_info.event)}电影 {event_info.item_name}" + elif event_info.item_type == "AUD": + message_title = f"{self._webhook_actions.get(event_info.event)}有声书 {event_info.item_name}" + else: + message_title = f"{self._webhook_actions.get(event_info.event)}" + + # 消息内容 + message_texts = [] + if event_info.user_name: + message_texts.append(f"用户:{event_info.user_name}") + if event_info.device_name: + message_texts.append(f"设备:{event_info.client} {event_info.device_name}") + if event_info.ip: + message_texts.append(f"IP地址:{event_info.ip} {WebUtils.get_location(event_info.ip)}") + if event_info.percentage: + percentage = round(float(event_info.percentage), 2) + message_texts.append(f"进度:{percentage}%") + if event_info.overview: + message_texts.append(f"剧情:{event_info.overview}") + message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}") + + # 消息内容 + message_content = "\n".join(message_texts) + + # 消息图片 + image_url = event_info.image_url + # 查询剧集图片 + if (event_info.tmdb_id + and event_info.season_id + and event_info.episode_id): + specific_image = self.chain.obtain_specific_image( + mediaid=event_info.tmdb_id, + mtype=MediaType.TV, + image_type=MediaImageType.Backdrop, + season=event_info.season_id, + episode=event_info.episode_id + ) + if specific_image: + image_url = specific_image + # 使用默认图片 + if not image_url: + image_url = self._webhook_images.get(event_info.channel) + + # 发送消息 + self.post_message(mtype=NotificationType.MediaServer, + title=message_title, text=message_content, image=image_url) + + def stop_service(self): + """ + 退出插件 + """ + pass