From 14d3ff6477d1197f4c76c230ec8106825cfa9475 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 11 Jun 2023 20:44:51 +0800 Subject: [PATCH] fix webhooks --- app/chain/webhook_message.py | 65 +++++++++++++++++++++++-- app/modules/emby/__init__.py | 2 +- app/modules/emby/emby.py | 13 +++-- app/modules/jellyfin/__init__.py | 3 +- app/modules/jellyfin/jellyfin.py | 13 +++-- app/modules/plex/plex.py | 13 +++-- app/utils/http.py | 30 +++++++++++- app/utils/ip.py | 81 ++++++++++++++++++++++++++++++++ 8 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 app/utils/ip.py diff --git a/app/chain/webhook_message.py b/app/chain/webhook_message.py index 6ff032e1..41fa089c 100644 --- a/app/chain/webhook_message.py +++ b/app/chain/webhook_message.py @@ -1,6 +1,8 @@ +import time from typing import Any from app.chain import ChainBase +from app.utils.http import WebUtils class WebhookMessageChain(ChainBase): @@ -13,8 +15,65 @@ class WebhookMessageChain(ChainBase): 处理Webhook报文并发送消息 """ # 获取主体内容 - info: dict = self.webhook_parser(body=body, form=form, args=args) - if not info: + event_info: dict = self.webhook_parser(body=body, form=form, args=args) + if not event_info: return + # 拼装消息内容 + _webhook_actions = { + "library.new": "新入库", + "system.webhooktest": "测试", + "playback.start": "开始播放", + "playback.stop": "停止播放", + "user.authenticated": "登录成功", + "user.authenticationfailed": "登录失败", + "media.play": "开始播放", + "media.stop": "停止播放", + "PlaybackStart": "开始播放", + "PlaybackStop": "停止播放", + "item.rate": "标记了" + } + _webhook_images = { + "emby": "https://emby.media/notificationicon.png", + "plex": "https://www.plex.tv/wp-content/uploads/2022/04/new-logo-process-lines-gray.png", + "jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi" + } + + if not _webhook_actions.get(event_info.get('event')): + return + + # 消息标题 + if event_info.get('item_type') in ["TV", "SHOW"]: + message_title = f"{_webhook_actions.get(event_info.get('event'))}剧集 {event_info.get('item_name')}" + elif event_info.get('item_type') == "MOV": + message_title = f"{_webhook_actions.get(event_info.get('event'))}电影 {event_info.get('item_name')}" + elif event_info.get('item_type') == "AUD": + message_title = f"{_webhook_actions.get(event_info.get('event'))}有声书 {event_info.get('item_name')}" + else: + message_title = f"{_webhook_actions.get(event_info.get('event'))}" + + # 消息内容 + message_texts = [] + if event_info.get('user_name'): + message_texts.append(f"用户:{event_info.get('user_name')}") + if event_info.get('device_name'): + message_texts.append(f"设备:{event_info.get('client')} {event_info.get('device_name')}") + if event_info.get('ip'): + message_texts.append(f"IP地址:{event_info.get('ip')} {WebUtils.get_location(event_info.get('ip'))}") + if event_info.get('percentage'): + percentage = round(float(event_info.get('percentage')), 2) + message_texts.append(f"进度:{percentage}%") + if event_info.get('overview'): + message_texts.append(f"剧情:{event_info.get('overview')}") + message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}") + + # 消息图片 + if not event_info.get("image_url"): + image_url = _webhook_images.get(event_info.get("channel")) + else: + image_url = event_info.get("image_url") + + # 消息内容 + message_content = "\n".join(message_texts) + # 发送消息 - self.post_message(title=info.get("title"), text=info.get("text"), image=info.get("image")) + self.post_message(title=message_title, text=message_content, image=image_url) diff --git a/app/modules/emby/__init__.py b/app/modules/emby/__init__.py index f9e85893..90badafe 100644 --- a/app/modules/emby/__init__.py +++ b/app/modules/emby/__init__.py @@ -29,7 +29,7 @@ class EmbyModule(_ModuleBase): :param args: 请求参数 :return: 字典,解析为消息时需要包含:title、text、image """ - return self.emby.get_webhook_message(json.loads(body)) + return self.emby.get_webhook_message(form.get("data")) def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]: """ diff --git a/app/modules/emby/emby.py b/app/modules/emby/emby.py index 1f05197d..2e690bf3 100644 --- a/app/modules/emby/emby.py +++ b/app/modules/emby/emby.py @@ -1,3 +1,4 @@ +import json import re from pathlib import Path from typing import List, Optional, Union, Dict @@ -438,12 +439,12 @@ class Emby(metaclass=Singleton): logger.error(f"连接Items/Id出错:" + str(e)) return {} - @staticmethod - def get_webhook_message(message: dict) -> dict: + def get_webhook_message(self, message_str: str) -> dict: """ 解析Emby Webhook报文 """ - eventItem = {'event': message.get('Event', '')} + message = json.loads(message_str) + eventItem = {'event': message.get('Event', ''), "channel": "emby"} if message.get('Item'): if message.get('Item', {}).get('Type') == 'Episode': eventItem['item_type'] = "TV" @@ -486,4 +487,10 @@ class Emby(metaclass=Singleton): if message.get("User"): eventItem['user_name'] = message.get("User").get('Name') + # 获取消息图片 + if eventItem.get("item_id"): + # 根据返回的item_id去调用媒体服务器获取 + eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'), + image_type="Backdrop") + return eventItem diff --git a/app/modules/jellyfin/__init__.py b/app/modules/jellyfin/__init__.py index 8820bda8..9358330c 100644 --- a/app/modules/jellyfin/__init__.py +++ b/app/modules/jellyfin/__init__.py @@ -1,3 +1,4 @@ +import json from typing import Optional, Tuple, Union, Any from app.core.context import MediaInfo @@ -27,7 +28,7 @@ class JellyfinModule(_ModuleBase): :param args: 请求参数 :return: 字典,解析为消息时需要包含:title、text、image """ - return self.jellyfin.get_webhook_message(form.get("data")) + return self.jellyfin.get_webhook_message(json.loads(body)) def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]: """ diff --git a/app/modules/jellyfin/jellyfin.py b/app/modules/jellyfin/jellyfin.py index db55cd57..e472a2a8 100644 --- a/app/modules/jellyfin/jellyfin.py +++ b/app/modules/jellyfin/jellyfin.py @@ -330,13 +330,20 @@ class Jellyfin(metaclass=Singleton): logger.error(f"连接Users/Items出错:" + str(e)) return {} - @staticmethod - def get_webhook_message(message: dict) -> dict: + def get_webhook_message(self, message: dict) -> dict: """ 解析Jellyfin报文 """ eventItem = {'event': message.get('NotificationType', ''), 'item_name': message.get('Name'), - 'user_name': message.get('NotificationUsername') + 'user_name': message.get('NotificationUsername'), + "channel": "jellyfin" } + + # 获取消息图片 + if eventItem.get("item_id"): + # 根据返回的item_id去调用媒体服务器获取 + eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'), + image_type="Backdrop") + return eventItem diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index 5a85a004..b91a5169 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -1,3 +1,4 @@ +import json from pathlib import Path from typing import List, Optional, Dict, Tuple from urllib.parse import quote_plus @@ -250,8 +251,7 @@ class Plex(metaclass=Singleton): break return ids - @staticmethod - def get_webhook_message(message: dict) -> dict: + def get_webhook_message(self, message_str: str) -> dict: """ 解析Plex报文 eventItem 字段的含义 @@ -261,7 +261,8 @@ class Plex(metaclass=Singleton): MOV:猪猪侠大冒险(2001) overview 剧情描述 """ - eventItem = {'event': message.get('event', '')} + message = json.loads(message_str) + eventItem = {'event': message.get('event', ''), "channel": "plex"} if message.get('Metadata'): if message.get('Metadata', {}).get('type') == 'episode': eventItem['item_type'] = "TV" @@ -295,4 +296,10 @@ class Plex(metaclass=Singleton): if message.get('Account'): eventItem['user_name'] = message.get("Account").get('title') + # 获取消息图片 + if eventItem.get("item_id"): + # 根据返回的item_id去调用媒体服务器获取 + eventItem['image_url'] = self.get_remote_image_by_id(item_id=eventItem.get('item_id'), + image_type="Backdrop") + return eventItem diff --git a/app/utils/http.py b/app/utils/http.py index ad9b5393..0f15cf7a 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -5,6 +5,8 @@ import urllib3 from requests import Session, Response from urllib3.exceptions import InsecureRequestWarning +from app.utils.ip import IpUtils + urllib3.disable_warnings(InsecureRequestWarning) @@ -150,7 +152,7 @@ class RequestUtils: return None @staticmethod - def cookie_parse(cookies_str: str, array: bool = False) -> dict: + def cookie_parse(cookies_str: str, array: bool = False) -> Union[list, dict]: """ 解析cookie,转化为字典或者数组 :param cookies_str: cookie字符串 @@ -172,3 +174,29 @@ class RequestUtils: cookiesList.append(cookies) return cookiesList return cookie_dict + + +class WebUtils: + + @staticmethod + def get_location(ip): + """ + 根据IP址查询真实地址 + """ + if not IpUtils.is_ipv4(ip): + return "" + url = 'https://sp0.baidu.com/8aQDcjqpAAV3otqbppnN2DJv/api.php?co=&resource_id=6006&t=1529895387942&ie=utf8' \ + '&oe=gbk&cb=op_aladdin_callback&format=json&tn=baidu&' \ + 'cb=jQuery110203920624944751099_1529894588086&_=1529894588088&query=%s' % ip + try: + r = RequestUtils().get_res(url) + if r: + r.encoding = 'gbk' + html = r.text + c1 = html.split('location":"')[1] + c2 = c1.split('","')[0] + return c2 + else: + return "" + except Exception as err: + return str(err) diff --git a/app/utils/ip.py b/app/utils/ip.py new file mode 100644 index 00000000..b9cd110a --- /dev/null +++ b/app/utils/ip.py @@ -0,0 +1,81 @@ +import ipaddress +import socket +from urllib.parse import urlparse + + +class IpUtils: + + @staticmethod + def is_ipv4(ip): + """ + 判断是不是ipv4 + """ + try: + socket.inet_pton(socket.AF_INET, ip) + except AttributeError: # no inet_pton here,sorry + try: + socket.inet_aton(ip) + except socket.error: + return False + return ip.count('.') == 3 + except socket.error: # not a valid ip + return False + return True + + @staticmethod + def is_ipv6(ip): + """ + 判断是不是ipv6 + """ + try: + socket.inet_pton(socket.AF_INET6, ip) + except socket.error: # not a valid ip + return False + return True + + @staticmethod + def is_internal(hostname): + """ + 判断一个host是内网还是外网 + """ + hostname = urlparse(hostname).hostname + if IpUtils.is_ip(hostname): + return IpUtils.is_private_ip(hostname) + else: + return IpUtils.is_internal_domain(hostname) + + @staticmethod + def is_ip(addr): + """ + 判断是不是ip + """ + try: + socket.inet_aton(addr) + return True + except socket.error: + return False + + @staticmethod + def is_internal_domain(domain): + """ + 判断域名是否为内部域名 + """ + # 获取域名对应的 IP 地址 + try: + ip = socket.gethostbyname(domain) + except socket.error: + return False + + # 判断 IP 地址是否属于内网 IP 地址范围 + return IpUtils.is_private_ip(ip) + + @staticmethod + def is_private_ip(ip_str): + """ + 判断是不是内网ip + """ + try: + return ipaddress.ip_address(ip_str.strip()).is_private + except Exception as e: + print(e) + return False