fix webhooks

This commit is contained in:
jxxghp
2023-06-11 20:44:51 +08:00
parent 738335d16c
commit 14d3ff6477
8 changed files with 205 additions and 15 deletions

View File

@ -1,6 +1,8 @@
import time
from typing import Any
from app.chain import ChainBase
from app.utils.http import WebUtils
class WebhookMessageChain(ChainBase):
@ -13,8 +15,65 @@ class WebhookMessageChain(ChainBase):
处理Webhook报文并发送消息
"""
# 获取主体内容
info: dict = self.webhook_parser(body=body, form=form, args=args)
if not info:
event_info: dict = self.webhook_parser(body=body, form=form, args=args)
if not event_info:
return
# 拼装消息内容
_webhook_actions = {
"library.new": "新入库",
"system.webhooktest": "测试",
"playback.start": "开始播放",
"playback.stop": "停止播放",
"user.authenticated": "登录成功",
"user.authenticationfailed": "登录失败",
"media.play": "开始播放",
"media.stop": "停止播放",
"PlaybackStart": "开始播放",
"PlaybackStop": "停止播放",
"item.rate": "标记了"
}
_webhook_images = {
"emby": "https://emby.media/notificationicon.png",
"plex": "https://www.plex.tv/wp-content/uploads/2022/04/new-logo-process-lines-gray.png",
"jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi"
}
if not _webhook_actions.get(event_info.get('event')):
return
# 消息标题
if event_info.get('item_type') in ["TV", "SHOW"]:
message_title = f"{_webhook_actions.get(event_info.get('event'))}剧集 {event_info.get('item_name')}"
elif event_info.get('item_type') == "MOV":
message_title = f"{_webhook_actions.get(event_info.get('event'))}电影 {event_info.get('item_name')}"
elif event_info.get('item_type') == "AUD":
message_title = f"{_webhook_actions.get(event_info.get('event'))}有声书 {event_info.get('item_name')}"
else:
message_title = f"{_webhook_actions.get(event_info.get('event'))}"
# 消息内容
message_texts = []
if event_info.get('user_name'):
message_texts.append(f"用户:{event_info.get('user_name')}")
if event_info.get('device_name'):
message_texts.append(f"设备:{event_info.get('client')} {event_info.get('device_name')}")
if event_info.get('ip'):
message_texts.append(f"IP地址{event_info.get('ip')} {WebUtils.get_location(event_info.get('ip'))}")
if event_info.get('percentage'):
percentage = round(float(event_info.get('percentage')), 2)
message_texts.append(f"进度:{percentage}%")
if event_info.get('overview'):
message_texts.append(f"剧情:{event_info.get('overview')}")
message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}")
# 消息图片
if not event_info.get("image_url"):
image_url = _webhook_images.get(event_info.get("channel"))
else:
image_url = event_info.get("image_url")
# 消息内容
message_content = "\n".join(message_texts)
# 发送消息
self.post_message(title=info.get("title"), text=info.get("text"), image=info.get("image"))
self.post_message(title=message_title, text=message_content, image=image_url)

View File

@ -29,7 +29,7 @@ class EmbyModule(_ModuleBase):
:param args: 请求参数
:return: 字典解析为消息时需要包含title、text、image
"""
return self.emby.get_webhook_message(json.loads(body))
return self.emby.get_webhook_message(form.get("data"))
def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]:
"""

View File

@ -1,3 +1,4 @@
import json
import re
from pathlib import Path
from typing import List, Optional, Union, Dict
@ -438,12 +439,12 @@ class Emby(metaclass=Singleton):
logger.error(f"连接Items/Id出错" + str(e))
return {}
@staticmethod
def get_webhook_message(message: dict) -> dict:
def get_webhook_message(self, message_str: str) -> dict:
"""
解析Emby Webhook报文
"""
eventItem = {'event': message.get('Event', '')}
message = json.loads(message_str)
eventItem = {'event': message.get('Event', ''), "channel": "emby"}
if message.get('Item'):
if message.get('Item', {}).get('Type') == 'Episode':
eventItem['item_type'] = "TV"
@ -486,4 +487,10 @@ class Emby(metaclass=Singleton):
if message.get("User"):
eventItem['user_name'] = message.get("User").get('Name')
# 获取消息图片
if eventItem.get("item_id"):
# 根据返回的item_id去调用媒体服务器获取
eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'),
image_type="Backdrop")
return eventItem

View File

@ -1,3 +1,4 @@
import json
from typing import Optional, Tuple, Union, Any
from app.core.context import MediaInfo
@ -27,7 +28,7 @@ class JellyfinModule(_ModuleBase):
:param args: 请求参数
:return: 字典解析为消息时需要包含title、text、image
"""
return self.jellyfin.get_webhook_message(form.get("data"))
return self.jellyfin.get_webhook_message(json.loads(body))
def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]:
"""

View File

@ -330,13 +330,20 @@ class Jellyfin(metaclass=Singleton):
logger.error(f"连接Users/Items出错" + str(e))
return {}
@staticmethod
def get_webhook_message(message: dict) -> dict:
def get_webhook_message(self, message: dict) -> dict:
"""
解析Jellyfin报文
"""
eventItem = {'event': message.get('NotificationType', ''),
'item_name': message.get('Name'),
'user_name': message.get('NotificationUsername')
'user_name': message.get('NotificationUsername'),
"channel": "jellyfin"
}
# 获取消息图片
if eventItem.get("item_id"):
# 根据返回的item_id去调用媒体服务器获取
eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'),
image_type="Backdrop")
return eventItem

View File

@ -1,3 +1,4 @@
import json
from pathlib import Path
from typing import List, Optional, Dict, Tuple
from urllib.parse import quote_plus
@ -250,8 +251,7 @@ class Plex(metaclass=Singleton):
break
return ids
@staticmethod
def get_webhook_message(message: dict) -> dict:
def get_webhook_message(self, message_str: str) -> dict:
"""
解析Plex报文
eventItem 字段的含义
@ -261,7 +261,8 @@ class Plex(metaclass=Singleton):
MOV:猪猪侠大冒险(2001)
overview 剧情描述
"""
eventItem = {'event': message.get('event', '')}
message = json.loads(message_str)
eventItem = {'event': message.get('event', ''), "channel": "plex"}
if message.get('Metadata'):
if message.get('Metadata', {}).get('type') == 'episode':
eventItem['item_type'] = "TV"
@ -295,4 +296,10 @@ class Plex(metaclass=Singleton):
if message.get('Account'):
eventItem['user_name'] = message.get("Account").get('title')
# 获取消息图片
if eventItem.get("item_id"):
# 根据返回的item_id去调用媒体服务器获取
eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'),
image_type="Backdrop")
return eventItem

View File

@ -5,6 +5,8 @@ import urllib3
from requests import Session, Response
from urllib3.exceptions import InsecureRequestWarning
from app.utils.ip import IpUtils
urllib3.disable_warnings(InsecureRequestWarning)
@ -150,7 +152,7 @@ class RequestUtils:
return None
@staticmethod
def cookie_parse(cookies_str: str, array: bool = False) -> dict:
def cookie_parse(cookies_str: str, array: bool = False) -> Union[list, dict]:
"""
解析cookie转化为字典或者数组
:param cookies_str: cookie字符串
@ -172,3 +174,29 @@ class RequestUtils:
cookiesList.append(cookies)
return cookiesList
return cookie_dict
class WebUtils:
@staticmethod
def get_location(ip):
"""
根据IP址查询真实地址
"""
if not IpUtils.is_ipv4(ip):
return ""
url = 'https://sp0.baidu.com/8aQDcjqpAAV3otqbppnN2DJv/api.php?co=&resource_id=6006&t=1529895387942&ie=utf8' \
'&oe=gbk&cb=op_aladdin_callback&format=json&tn=baidu&' \
'cb=jQuery110203920624944751099_1529894588086&_=1529894588088&query=%s' % ip
try:
r = RequestUtils().get_res(url)
if r:
r.encoding = 'gbk'
html = r.text
c1 = html.split('location":"')[1]
c2 = c1.split('","')[0]
return c2
else:
return ""
except Exception as err:
return str(err)

81
app/utils/ip.py Normal file
View File

@ -0,0 +1,81 @@
import ipaddress
import socket
from urllib.parse import urlparse
class IpUtils:
@staticmethod
def is_ipv4(ip):
"""
判断是不是ipv4
"""
try:
socket.inet_pton(socket.AF_INET, ip)
except AttributeError: # no inet_pton here,sorry
try:
socket.inet_aton(ip)
except socket.error:
return False
return ip.count('.') == 3
except socket.error: # not a valid ip
return False
return True
@staticmethod
def is_ipv6(ip):
"""
判断是不是ipv6
"""
try:
socket.inet_pton(socket.AF_INET6, ip)
except socket.error: # not a valid ip
return False
return True
@staticmethod
def is_internal(hostname):
"""
判断一个host是内网还是外网
"""
hostname = urlparse(hostname).hostname
if IpUtils.is_ip(hostname):
return IpUtils.is_private_ip(hostname)
else:
return IpUtils.is_internal_domain(hostname)
@staticmethod
def is_ip(addr):
"""
判断是不是ip
"""
try:
socket.inet_aton(addr)
return True
except socket.error:
return False
@staticmethod
def is_internal_domain(domain):
"""
判断域名是否为内部域名
"""
# 获取域名对应的 IP 地址
try:
ip = socket.gethostbyname(domain)
except socket.error:
return False
# 判断 IP 地址是否属于内网 IP 地址范围
return IpUtils.is_private_ip(ip)
@staticmethod
def is_private_ip(ip_str):
"""
判断是不是内网ip
"""
try:
return ipaddress.ip_address(ip_str.strip()).is_private
except Exception as e:
print(e)
return False