2023-09-22 16:04:04 +08:00

204 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
from typing import Optional, List
from urllib.parse import quote
from threading import Lock
from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.metainfo import MetaInfo
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
lock = Lock()
class SynologyChat(metaclass=Singleton):
def __init__(self):
self._req = RequestUtils(content_type="application/x-www-form-urlencoded")
self._webhook_url = settings.SYNOLOGYCHAT_WEBHOOK
self._token = settings.SYNOLOGYCHAT_TOKEN
if self._webhook_url:
self._domain = StringUtils.get_base_url(self._webhook_url)
def check_token(self, token: str) -> bool:
return True if token == self._token else False
def send_msg(self, title: str, text: str = "", image: str = "", userid: str = "") -> Optional[bool]:
"""
发送Telegram消息
:param title: 消息标题
:param text: 消息内容
:param image: 消息图片地址
:param userid: 用户ID如有则只发消息给该用户
:user_id: 发送消息的目标用户ID为空则发给管理员
"""
if not title and not text:
logger.error("标题和内容不能同时为空")
return False
if not self._webhook_url or not self._token:
return False
try:
# 拼装消息内容
titles = str(title).split('\n')
if len(titles) > 1:
title = titles[0]
if not text:
text = "\n".join(titles[1:])
else:
text = f"%s\n%s" % ("\n".join(titles[1:]), text)
if text:
caption = "*%s*\n%s" % (title, text.replace("\n\n", "\n"))
else:
caption = title
payload_data = {'text': quote(caption)}
if image:
payload_data['file_url'] = quote(image)
if userid:
payload_data['user_ids'] = [int(userid)]
else:
userids = self.__get_bot_users()
if not userids:
logger.error("SynologyChat机器人没有对任何用户可见")
return False
payload_data['user_ids'] = userids
return self.__send_request(payload_data)
except Exception as msg_e:
logger.error(f"SynologyChat发送消息错误{str(msg_e)}")
return False
def send_meidas_msg(self, medias: List[MediaInfo], userid: str = "", title: str = "") -> Optional[bool]:
"""
发送列表类消息
"""
if not medias:
return False
if not self._webhook_url or not self._token:
return False
try:
if not title or not isinstance(medias, list):
return False
index, image, caption = 1, "", "*%s*" % title
for media in medias:
if not image:
image = media.get_message_image()
if media.vote_average:
caption = "%s\n%s. <%s|%s>\n_%s%s_" % (caption,
index,
media.detail_link,
media.title_year,
f"类型:{media.type.value}",
f"评分:{media.vote_average}")
else:
caption = "%s\n%s. <%s|%s>\n_%s_" % (caption,
index,
media.detail_link,
media.title_year,
f"类型:{media.type.value}")
index += 1
if userid:
userids = [int(userid)]
else:
userids = self.__get_bot_users()
payload_data = {
"text": quote(caption),
"user_ids": userids
}
return self.__send_request(payload_data)
except Exception as msg_e:
logger.error(f"SynologyChat发送消息错误{str(msg_e)}")
return False
def send_torrents_msg(self, torrents: List[Context],
userid: str = "", title: str = "") -> Optional[bool]:
"""
发送列表消息
"""
if not self._webhook_url or not self._token:
return None
if not torrents:
return False
try:
index, caption = 1, "*%s*" % title
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title = re.sub(r"\s+", " ", title).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
caption = f"{caption}\n{index}.【{site_name}】<{link}|{title}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"_{description}_"
index += 1
if userid:
userids = [int(userid)]
else:
userids = self.__get_bot_users()
payload_data = {
"text": quote(caption),
"user_ids": userids
}
return self.__send_request(payload_data)
except Exception as msg_e:
logger.error(f"SynologyChat发送消息错误{str(msg_e)}")
return False
def __get_bot_users(self):
"""
查询机器人可见的用户列表
"""
if not self._domain or not self._token:
return []
req_url = f"{self._domain}" \
f"/webapi/entry.cgi?api=SYNO.Chat.External&method=user_list&version=2&token=" \
f"{self._token}"
ret = self._req.get_res(url=req_url)
if ret and ret.status_code == 200:
users = ret.json().get("data", {}).get("users", []) or []
return [user.get("user_id") for user in users]
else:
return []
def __send_request(self, payload_data):
"""
发送消息请求
"""
payload = f"payload={json.dumps(payload_data)}"
ret = self._req.post_res(url=self._webhook_url, data=payload)
if ret and ret.status_code == 200:
result = ret.json()
if result:
errno = result.get('error', {}).get('code')
errmsg = result.get('error', {}).get('errors')
if not errno:
return True
logger.error(f"SynologyChat返回错误{errno}-{errmsg}")
return False
else:
logger.error(f"SynologyChat返回{ret.text}")
return False
elif ret is not None:
logger.error(f"SynologyChat请求失败错误码{ret.status_code},错误原因:{ret.reason}")
return False
else:
logger.error(f"SynologyChat请求失败未获取到返回信息")
return False