2023-11-01 15:53:45 +08:00

224 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import Any, List, Dict, Tuple
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas import WebhookEventInfo
from app.schemas.types import EventType, MediaType, MediaImageType, NotificationType
from app.utils.web import WebUtils
class MediaServerMsg(_PluginBase):
# 插件名称
plugin_name = "媒体服务器通知"
# 插件描述
plugin_desc = "发送媒体服务器播入、入库等通知消息。"
# 插件图标
plugin_icon = "mediaplay.png"
# 主题色
plugin_color = "#42A3DB"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
author_url = "https://github.com/jxxghp"
# 插件配置项ID前缀
plugin_config_prefix = "mediaservermsg_"
# 加载顺序
plugin_order = 14
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
_types = []
# 拼装消息内容
_webhook_actions = {
"library.new": "新入库",
"system.webhooktest": "测试",
"playback.start": "开始播放",
"playback.stop": "停止播放",
"user.authenticated": "登录成功",
"user.authenticationfailed": "登录失败",
"media.play": "开始播放",
"media.stop": "停止播放",
"PlaybackStart": "开始播放",
"PlaybackStop": "停止播放",
"item.rate": "标记了"
}
_webhook_images = {
"emby": "https://emby.media/notificationicon.png",
"plex": "https://www.plex.tv/wp-content/uploads/2022/04/new-logo-process-lines-gray.png",
"jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi"
}
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._types = config.get("types") or []
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
types_options = [
{"title": "新入库", "value": "library.new"},
{"title": "开始播放", "value": "playback.start|media.play|PlaybackStart"},
{"title": "停止播放", "value": "playback.stop|media.stop|PlaybackStop"},
{"title": "用户标记", "value": "item.rate"},
{"title": "测试", "value": "system.webhooktest"},
]
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VSelect',
'props': {
'chips': True,
'multiple': True,
'model': 'types',
'label': '消息类型',
'items': types_options
}
}
]
}
]
},
]
}
], {
"enabled": False,
"types": []
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.WebhookMessage)
def send(self, event: Event):
"""
发送通知消息
"""
if not self._enabled:
return
event_info: WebhookEventInfo = event.event_data
if not event_info:
return
# 不在支持范围不处理
if not self._webhook_actions.get(event_info.event):
return
# 不在选中范围不处理
msgflag = False
for _type in self._types:
if event_info.event in _type.split("|"):
msgflag = True
break
if not msgflag:
logger.info(f"未开启 {event_info.event} 类型的消息通知")
return
# 消息标题
if event_info.item_type in ["TV", "SHOW"]:
message_title = f"{self._webhook_actions.get(event_info.event)}剧集 {event_info.item_name}"
elif event_info.item_type == "MOV":
message_title = f"{self._webhook_actions.get(event_info.event)}电影 {event_info.item_name}"
elif event_info.item_type == "AUD":
message_title = f"{self._webhook_actions.get(event_info.event)}有声书 {event_info.item_name}"
else:
message_title = f"{self._webhook_actions.get(event_info.event)}"
# 消息内容
message_texts = []
if event_info.user_name:
message_texts.append(f"用户:{event_info.user_name}")
if event_info.device_name:
message_texts.append(f"设备:{event_info.client} {event_info.device_name}")
if event_info.ip:
message_texts.append(f"IP地址{event_info.ip} {WebUtils.get_location(event_info.ip)}")
if event_info.percentage:
percentage = round(float(event_info.percentage), 2)
message_texts.append(f"进度:{percentage}%")
if event_info.overview:
message_texts.append(f"剧情:{event_info.overview}")
message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}")
# 消息内容
message_content = "\n".join(message_texts)
# 消息图片
image_url = event_info.image_url
# 查询剧集图片
if (event_info.tmdb_id
and event_info.season_id
and event_info.episode_id):
specific_image = self.chain.obtain_specific_image(
mediaid=event_info.tmdb_id,
mtype=MediaType.TV,
image_type=MediaImageType.Backdrop,
season=event_info.season_id,
episode=event_info.episode_id
)
if specific_image:
image_url = specific_image
# 使用默认图片
if not image_url:
image_url = self._webhook_images.get(event_info.channel)
# 发送消息
self.post_message(mtype=NotificationType.MediaServer,
title=message_title, text=message_content, image=image_url)
def stop_service(self):
"""
退出插件
"""
pass