feat 多通知渠道支持

This commit is contained in:
jxxghp
2023-07-14 13:05:58 +08:00
parent c1e8b6d0ff
commit 6d2f4697b0
22 changed files with 359 additions and 255 deletions

View File

@ -1,13 +1,13 @@
import xml.dom.minidom
from typing import Optional, Union, List, Tuple, Any
from app.core.context import MediaInfo, Context
from app.core.config import settings
from app.core.context import Context, MediaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules import _ModuleBase, checkMessage
from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt
from app.modules.wechat.wechat import WeChat
from app.schemas import MessageChannel, CommingMessage
from app.schemas import MessageChannel, CommingMessage, Notification
from app.utils.dom import DomUtils
@ -114,41 +114,35 @@ class WechatModule(_ModuleBase):
logger.error(f"微信消息处理发生错误:{err}")
return None
def post_message(self, title: str,
text: str = None, image: str = None, userid: Union[str, int] = None) -> Optional[bool]:
@checkMessage(MessageChannel.Wechat)
def post_message(self, message: Notification) -> Optional[bool]:
"""
发送消息
:param title: 标题
:param text: 内容
:param image: 图片
:param userid: 用户ID
:param message: 消息内容
:return: 成功或失败
"""
return self.wechat.send_msg(title=title, text=text, image=image, userid=userid)
return self.wechat.send_msg(title=message.title, text=message.text,
image=message.image, userid=message.userid)
def post_medias_message(self, title: str, items: List[MediaInfo],
userid: Union[str, int] = None) -> Optional[bool]:
@checkMessage(MessageChannel.Wechat)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> Optional[bool]:
"""
发送媒体信息选择列表
:param title: 标题
:param items: 消息列表
:param userid: 用户ID
:param message: 消息内容
:param medias: 媒体列表
:return: 成功或失败
"""
# 先发送标题
self.wechat.send_msg(title=title)
self.wechat.send_msg(title=message.title)
# 再发送内容
return self.wechat.send_medias_msg(medias=items, userid=userid)
return self.wechat.send_medias_msg(medias=medias, userid=message.userid)
def post_torrents_message(self, title: str, items: List[Context],
mediainfo: MediaInfo,
userid: Union[str, int] = None) -> Optional[bool]:
@checkMessage(MessageChannel.Wechat)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> Optional[bool]:
"""
发送种子信息选择列表
:param title: 标题
:param items: 消息列表
:param mediainfo: 媒体信息
:param userid: 用户ID
:param message: 消息内容
:param torrents: 种子列表
:return: 成功或失败
"""
return self.wechat.send_torrents_msg(title=title, torrents=items, mediainfo=mediainfo, userid=userid)
return self.wechat.send_torrents_msg(title=message.title, torrents=torrents, userid=message.userid)