feat wechat消息转发插件--根据正则转发通知到其他wechat应用,解决只能配置一个wechat通知问题

This commit is contained in:
thsrite 2023-08-07 12:05:45 +08:00
parent 69fc865908
commit af64f1829e
3 changed files with 408 additions and 0 deletions

View File

@ -3,11 +3,13 @@ from typing import Optional, Union, List, Tuple, Any
from app.core.config import settings
from app.core.context import Context, MediaInfo
from app.core.event import EventManager
from app.log import logger
from app.modules import _ModuleBase, checkMessage
from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt
from app.modules.wechat.wechat import WeChat
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import EventType
from app.utils.dom import DomUtils
@ -121,6 +123,14 @@ class WechatModule(_ModuleBase):
:param message: 消息内容
:return: 成功或失败
"""
# 发送事件
EventManager().send_event(etype=EventType.WechatMessage,
data={
"title": message.title,
"text": message.text,
"image": message.image,
"userid": message.userid,
})
return self.wechat.send_msg(title=message.title, text=message.text,
image=message.image, userid=message.userid)

View File

@ -0,0 +1,396 @@
import json
import re
from datetime import datetime
from app.core.config import settings
from app.plugins import _PluginBase
from app.core.event import eventmanager
from app.schemas.types import EventType
from app.utils.http import RequestUtils
from typing import Any, List, Dict, Tuple, Optional
from app.log import logger
class MessageForward(_PluginBase):
# 插件名称
plugin_name = "wechat消息转发"
# 插件描述
plugin_desc = "根据正则转发通知到其他wechat应用。"
# 插件图标
plugin_icon = "forward.png"
# 主题色
plugin_color = "#32ABD1"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "messageforward_"
# 加载顺序
plugin_order = 16
# 可使用的用户级别
auth_level = 2
# 私有属性
_enabled = False
_wechat = None
_pattern = None
_pattern_token = {}
# 企业微信发送消息URL
_send_msg_url = f"{settings.WECHAT_PROXY}/cgi-bin/message/send?access_token=%s"
# 企业微信获取TokenURL
_token_url = f"{settings.WECHAT_PROXY}/cgi-bin/gettoken?corpid=%s&corpsecret=%s"
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._wechat = config.get("wechat")
self._pattern = config.get("pattern")
# 获取token存库
if self._enabled and self._wechat:
self.__save_wechat_token()
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1页面配置2数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '开启转发'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'wechat',
'rows': '3',
'label': 'wechat应用配置',
'placeholder': 'appid:corpid:appsecret一行一个配置'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'pattern',
'rows': '3',
'label': '正则配置',
'placeholder': '对应上方wechat配置一行一个一一对应'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"wechat": "",
"pattern": ""
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.WechatMessage)
def send(self, event):
"""
向第三方Webhook发送请求
"""
if not self._enabled:
return
# 消息体
data = event.event_data
title = data['title']
text = data['text']
image = data['image']
userid = data['userid']
# 正则匹配
patterns = self._pattern.split("\n")
for i, pattern in enumerate(patterns):
msg_match = re.search(pattern, title)
if msg_match:
access_token, appid = self.__flush_access_token(i)
if not access_token:
continue
# 发送消息
if image:
self.__send_image_message(title, text, image, userid, access_token, appid, i)
else:
self.__send_message(title, text, userid, access_token, appid, i)
def __save_wechat_token(self):
"""
获取并存储wechat token
"""
# 查询历史
wechat_token_history = self.get_data("wechat_token") or {}
# 解析配置
wechats = self._wechat.split("\n")
for i, wechat in enumerate(wechats):
wechat_config = wechat.split(":")
if len(wechat_config) != 3:
logger.error(f"{wechat} 应用配置不正确")
continue
appid = wechat_config[0]
corpid = wechat_config[1]
appsecret = wechat_config[2]
# 查询历史是否存储token
wechat_config = wechat_token_history.get("appid")
access_token = None
expires_in = None
access_token_time = None
if wechat_config:
access_token_time = wechat_config['access_token_time']
expires_in = wechat_config['expires_in']
# 判断token是否过期
if (datetime.now() - access_token_time).seconds < expires_in:
# 重新获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if not access_token:
# 获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if access_token:
wechat_token_history[appid] = {
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": str(access_token_time),
"corpid": corpid,
"appsecret": appsecret
}
self._pattern_token[i] = {
"appid": appid,
"corpid": corpid,
"appsecret": appsecret,
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": access_token_time,
}
else:
logger.error(f"wechat配置 appid = {appid} 获取token失败请检查配置")
# 保存wechat token
if wechat_token_history:
self.save_data("wechat_token", wechat_token_history)
def __flush_access_token(self, i: int):
"""
获取第i个配置wechat token
"""
wechat_token = self._pattern_token[i]
if not wechat_token:
logger.error(f"未获取到第 {i} 条正则对应的wechat应用token请检查配置")
return None
access_token = wechat_token['access_token']
expires_in = wechat_token['expires_in']
access_token_time = wechat_token['access_token_time']
appid = wechat_token['appid']
corpid = wechat_token['corpid']
appsecret = wechat_token['appsecret']
# 判断token有效期
if (datetime.now() - access_token_time).seconds < expires_in:
# 重新获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if not access_token:
logger.error(f"wechat配置 appid = {appid} 获取token失败请检查配置")
return None, None
self._pattern_token[i] = {
"appid": appid,
"corpid": corpid,
"appsecret": appsecret,
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": access_token_time,
}
return access_token, appid
def __send_message(self, title: str, text: str = None, userid: str = None, access_token: str = None,
appid: str = None, i: int = None) -> \
Optional[bool]:
"""
发送文本消息
:param title: 消息标题
:param text: 消息内容
:param userid: 消息发送对象的ID为空则发给所有人
:return: 发送状态错误信息
"""
message_url = self._send_msg_url % access_token
if text:
conent = "%s\n%s" % (title, text.replace("\n\n", "\n"))
else:
conent = title
if not userid:
userid = "@all"
req_json = {
"touser": userid,
"msgtype": "text",
"agentid": appid,
"text": {
"content": conent
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
return self.__post_request(message_url, req_json, i, title)
def __send_image_message(self, title: str, text: str, image_url: str, userid: str = None, access_token: str = None,
appid: str = None, i: int = None) -> Optional[bool]:
"""
发送图文消息
:param title: 消息标题
:param text: 消息内容
:param image_url: 图片地址
:param userid: 消息发送对象的ID为空则发给所有人
:return: 发送状态错误信息
"""
message_url = self._send_msg_url % access_token
if text:
text = text.replace("\n\n", "\n")
if not userid:
userid = "@all"
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": appid,
"news": {
"articles": [
{
"title": title,
"description": text,
"picurl": image_url,
"url": ''
}
]
}
}
return self.__post_request(message_url, req_json, i, title)
def __post_request(self, message_url: str, req_json: dict, i: int, title: str) -> bool:
"""
向微信发送请求
"""
try:
res = RequestUtils(content_type='application/json').post(
message_url,
data=json.dumps(req_json, ensure_ascii=False).encode('utf-8')
)
if res and res.status_code == 200:
ret_json = res.json()
if ret_json.get('errcode') == 0:
logger.info(f"转发消息 {title} 成功")
return True
else:
if ret_json.get('errcode') == 42001:
# 重新获取token
self.__flush_access_token(i)
logger.error(f"转发消息 {title} 失败,错误信息:{ret_json}")
return False
elif res is not None:
logger.error(f"转发消息 {title} 失败,错误码:{res.status_code},错误原因:{res.reason}")
return False
else:
logger.error(f"转发消息 {title} 失败,未获取到返回信息")
return False
except Exception as err:
logger.error(f"转发消息 {title} 失败,错误信息:{err}")
return False
def __get_access_token(self, corpid, appsecret):
"""
获取微信Token
:return 微信Token
"""
try:
token_url = self._token_url % (corpid, appsecret)
res = RequestUtils().get_res(token_url)
if res:
ret_json = res.json()
if ret_json.get('errcode') == 0:
access_token = ret_json.get('access_token')
expires_in = ret_json.get('expires_in')
access_token_time = datetime.now()
return access_token, expires_in, access_token_time
else:
logger.error(f"{ret_json.get('errmsg')}")
return None, None, None
else:
logger.error(f"{corpid} {appsecret} 获取token失败")
return None, None, None
except Exception as e:
logger.error(f"获取微信access_token失败错误信息{e}")
return None, None, None
def stop_service(self):
"""
退出插件
"""
pass

View File

@ -36,6 +36,8 @@ class EventType(Enum):
MediaDeleted = "media.deleted"
# 用户外来消息
UserMessage = "user.message"
# wechat消息
WechatMessage = "wechat.message"
# 系统配置Key字典