Merge pull request #38 from thsrite/main

This commit is contained in:
jxxghp 2023-08-07 13:31:12 +08:00 committed by GitHub
commit 4ce140a382
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 696 additions and 3 deletions

View File

@ -16,7 +16,7 @@ from app.core.module import ModuleManager
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
WebhookEventInfo
from app.schemas.types import TorrentStatus, MediaType, MediaImageType
from app.schemas.types import TorrentStatus, MediaType, MediaImageType, EventType
from app.utils.object import ObjectUtils
@ -330,6 +330,15 @@ class ChainBase(metaclass=ABCMeta):
:param message: 消息体
:return: 成功或失败
"""
# 发送事件
self.eventmanager.send_event(etype=EventType.NoticeMessage,
data={
"channel": message.channel,
"title": message.title,
"text": message.text,
"image": message.image,
"userid": message.userid,
})
return self.run_module("post_message", message=message)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> Optional[bool]:

View File

@ -28,6 +28,11 @@ class Base:
db.query(cls).filter(cls.id == rid).delete()
db.commit()
@classmethod
def truncate(cls, db):
db.query(cls).delete()
db.commit()
@classmethod
def list(cls, db):
return db.query(cls).all()

View File

@ -60,3 +60,9 @@ class TransferHistoryOper(DbOper):
删除转移记录
"""
TransferHistory.delete(self._db, historyid)
def truncate(self):
"""
清空转移记录
"""
TransferHistory.truncate(self._db)

View File

@ -25,9 +25,9 @@ class MediaSyncDel(_PluginBase):
# 插件描述
plugin_desc = "媒体库删除媒体后同步删除历史记录或源文件。"
# 插件图标
plugin_icon = "sync.png"
plugin_icon = "mediasyncdel.png"
# 主题色
plugin_color = "#53BA47"
plugin_color = "#ff1a1a"
# 插件版本
plugin_version = "1.0"
# 插件作者

View File

@ -0,0 +1,400 @@
import json
import re
from datetime import datetime
from app.core.config import settings
from app.plugins import _PluginBase
from app.core.event import eventmanager
from app.schemas.types import EventType, MessageChannel
from app.utils.http import RequestUtils
from typing import Any, List, Dict, Tuple, Optional
from app.log import logger
class MessageForward(_PluginBase):
# 插件名称
plugin_name = "消息转发"
# 插件描述
plugin_desc = "根据正则转发通知到其他WeChat应用。"
# 插件图标
plugin_icon = "forward.png"
# 主题色
plugin_color = "#32ABD1"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "messageforward_"
# 加载顺序
plugin_order = 16
# 可使用的用户级别
auth_level = 2
# 私有属性
_enabled = False
_wechat = None
_pattern = None
_pattern_token = {}
# 企业微信发送消息URL
_send_msg_url = f"{settings.WECHAT_PROXY}/cgi-bin/message/send?access_token=%s"
# 企业微信获取TokenURL
_token_url = f"{settings.WECHAT_PROXY}/cgi-bin/gettoken?corpid=%s&corpsecret=%s"
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._wechat = config.get("wechat")
self._pattern = config.get("pattern")
# 获取token存库
if self._enabled and self._wechat:
self.__save_wechat_token()
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1页面配置2数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '开启转发'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'wechat',
'rows': '3',
'label': '应用配置',
'placeholder': 'appid:corpid:appsecret一行一个配置'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'pattern',
'rows': '3',
'label': '正则配置',
'placeholder': '对应上方应用配置,一行一个,一一对应'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"wechat": "",
"pattern": ""
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.NoticeMessage)
def send(self, event):
"""
消息转发
"""
if not self._enabled:
return
# 消息体
data = event.event_data
channel = data['channel']
if channel and channel != MessageChannel.Wechat:
return
title = data['title']
text = data['text']
image = data['image']
userid = data['userid']
# 正则匹配
patterns = self._pattern.split("\n")
for i, pattern in enumerate(patterns):
msg_match = re.search(pattern, title)
if msg_match:
access_token, appid = self.__flush_access_token(i)
if not access_token:
continue
# 发送消息
if image:
self.__send_image_message(title, text, image, userid, access_token, appid, i)
else:
self.__send_message(title, text, userid, access_token, appid, i)
def __save_wechat_token(self):
"""
获取并存储wechat token
"""
# 查询历史
wechat_token_history = self.get_data("wechat_token") or {}
# 解析配置
wechats = self._wechat.split("\n")
for i, wechat in enumerate(wechats):
wechat_config = wechat.split(":")
if len(wechat_config) != 3:
logger.error(f"{wechat} 应用配置不正确")
continue
appid = wechat_config[0]
corpid = wechat_config[1]
appsecret = wechat_config[2]
# 查询历史是否存储token
wechat_config = wechat_token_history.get("appid")
access_token = None
expires_in = None
access_token_time = None
if wechat_config:
access_token_time = wechat_config['access_token_time']
expires_in = wechat_config['expires_in']
# 判断token是否过期
if (datetime.now() - access_token_time).seconds < expires_in:
# 重新获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if not access_token:
# 获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if access_token:
wechat_token_history[appid] = {
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": str(access_token_time),
"corpid": corpid,
"appsecret": appsecret
}
self._pattern_token[i] = {
"appid": appid,
"corpid": corpid,
"appsecret": appsecret,
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": access_token_time,
}
else:
logger.error(f"wechat配置 appid = {appid} 获取token失败请检查配置")
# 保存wechat token
if wechat_token_history:
self.save_data("wechat_token", wechat_token_history)
def __flush_access_token(self, i: int):
"""
获取第i个配置wechat token
"""
wechat_token = self._pattern_token[i]
if not wechat_token:
logger.error(f"未获取到第 {i} 条正则对应的wechat应用token请检查配置")
return None
access_token = wechat_token['access_token']
expires_in = wechat_token['expires_in']
access_token_time = wechat_token['access_token_time']
appid = wechat_token['appid']
corpid = wechat_token['corpid']
appsecret = wechat_token['appsecret']
# 判断token有效期
if (datetime.now() - access_token_time).seconds < expires_in:
# 重新获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if not access_token:
logger.error(f"wechat配置 appid = {appid} 获取token失败请检查配置")
return None, None
self._pattern_token[i] = {
"appid": appid,
"corpid": corpid,
"appsecret": appsecret,
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": access_token_time,
}
return access_token, appid
def __send_message(self, title: str, text: str = None, userid: str = None, access_token: str = None,
appid: str = None, i: int = None) -> \
Optional[bool]:
"""
发送文本消息
:param title: 消息标题
:param text: 消息内容
:param userid: 消息发送对象的ID为空则发给所有人
:return: 发送状态错误信息
"""
message_url = self._send_msg_url % access_token
if text:
conent = "%s\n%s" % (title, text.replace("\n\n", "\n"))
else:
conent = title
if not userid:
userid = "@all"
req_json = {
"touser": userid,
"msgtype": "text",
"agentid": appid,
"text": {
"content": conent
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
return self.__post_request(message_url, req_json, i, title)
def __send_image_message(self, title: str, text: str, image_url: str, userid: str = None, access_token: str = None,
appid: str = None, i: int = None) -> Optional[bool]:
"""
发送图文消息
:param title: 消息标题
:param text: 消息内容
:param image_url: 图片地址
:param userid: 消息发送对象的ID为空则发给所有人
:return: 发送状态错误信息
"""
message_url = self._send_msg_url % access_token
if text:
text = text.replace("\n\n", "\n")
if not userid:
userid = "@all"
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": appid,
"news": {
"articles": [
{
"title": title,
"description": text,
"picurl": image_url,
"url": ''
}
]
}
}
return self.__post_request(message_url, req_json, i, title)
def __post_request(self, message_url: str, req_json: dict, i: int, title: str) -> bool:
"""
向微信发送请求
"""
try:
res = RequestUtils(content_type='application/json').post(
message_url,
data=json.dumps(req_json, ensure_ascii=False).encode('utf-8')
)
if res and res.status_code == 200:
ret_json = res.json()
if ret_json.get('errcode') == 0:
logger.info(f"转发消息 {title} 成功")
return True
else:
if ret_json.get('errcode') == 42001:
# 重新获取token
self.__flush_access_token(i)
logger.error(f"转发消息 {title} 失败,错误信息:{ret_json}")
return False
elif res is not None:
logger.error(f"转发消息 {title} 失败,错误码:{res.status_code},错误原因:{res.reason}")
return False
else:
logger.error(f"转发消息 {title} 失败,未获取到返回信息")
return False
except Exception as err:
logger.error(f"转发消息 {title} 失败,错误信息:{err}")
return False
def __get_access_token(self, corpid, appsecret):
"""
获取微信Token
:return 微信Token
"""
try:
token_url = self._token_url % (corpid, appsecret)
res = RequestUtils().get_res(token_url)
if res:
ret_json = res.json()
if ret_json.get('errcode') == 0:
access_token = ret_json.get('access_token')
expires_in = ret_json.get('expires_in')
access_token_time = datetime.now()
return access_token, expires_in, access_token_time
else:
logger.error(f"{ret_json.get('errmsg')}")
return None, None, None
else:
logger.error(f"{corpid} {appsecret} 获取token失败")
return None, None, None
except Exception as e:
logger.error(f"获取微信access_token失败错误信息{e}")
return None, None, None
def stop_service(self):
"""
退出插件
"""
pass

View File

@ -0,0 +1,271 @@
import sqlite3
from datetime import datetime
from app.db.transferhistory_oper import TransferHistoryOper
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple
from app.log import logger
class NAStoolSync(_PluginBase):
# 插件名称
plugin_name = "历史记录同步"
# 插件描述
plugin_desc = "同步NAStool历史记录到MoviePilot。"
# 插件图标
plugin_icon = "sync.png"
# 主题色
plugin_color = "#53BA47"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "nastoolsync_"
# 加载顺序
plugin_order = 15
# 可使用的用户级别
auth_level = 2
# 私有属性
_transferhistory = None
_clear = None
_nt_db_path = None
_path = None
def init_plugin(self, config: dict = None):
self._transferhistory = TransferHistoryOper()
if config:
self._clear = config.get("clear")
self._nt_db_path = config.get("nt_db_path")
self._path = config.get("path")
if self._nt_db_path:
# 导入转移历史
self.sync_transfer_history()
def sync_transfer_history(self):
"""
导入nt转移记录
"""
# 开始计时
start_time = datetime.now()
nt_historys = self.get_nt_transfer_history()
# 清空MoviePilot转移记录
if self._clear:
logger.info("MoviePilot转移记录已清空")
self._transferhistory.truncate()
# 处理数据存入mp数据库
for history in nt_historys:
msrc = history[0]
mdest = history[1]
mmode = history[2]
mtype = history[3]
mcategory = history[4]
mtitle = history[5]
myear = history[6]
mtmdbid = history[7]
mseasons = history[8]
mepisodes = history[9]
mimage = history[10]
mdownload_hash = history[11]
mdate = history[12]
# 处理路径映射
if self._path:
paths = self._path.split("\n")
for path in paths:
sub_paths = path.split(":")
msrc = msrc.replace(sub_paths[0], sub_paths[1]).replace('\\', '/')
mdest = mdest.replace(sub_paths[0], sub_paths[1]).replace('\\', '/')
# 存库
self._transferhistory.add(
src=msrc,
dest=mdest,
mode=mmode,
type=mtype,
category=mcategory,
title=mtitle,
year=myear,
tmdbid=mtmdbid,
seasons=mseasons,
episodes=mepisodes,
image=mimage,
download_hash=mdownload_hash,
date=mdate
)
logger.debug(f"{mtitle} {myear} {mtmdbid} {mseasons} {mepisodes} 已同步")
self.update_config(
{
"clear": False,
"nt_db_path": "",
"path": self._path
}
)
# 计算耗时
end_time = datetime.now()
logger.info(f"转移记录已同步完成。总耗时 {(end_time - start_time).seconds}")
def get_nt_transfer_history(self):
"""
获取nt转移记录
"""
# 读取sqlite数据
gradedb = sqlite3.connect(self._nt_db_path)
# 创建游标cursor来执行execute语句
cursor = gradedb.cursor()
sql = '''SELECT
t.SOURCE_PATH || '/' || t.SOURCE_FILENAME AS src,
t.DEST_PATH || '/' || t.DEST_FILENAME AS dest,
CASE
t.MODE
WHEN '硬链接' THEN
'link'
WHEN '移动' THEN
'move'
WHEN '复制' THEN
'copy'
END AS mode,
CASE
t.TYPE
WHEN '动漫' THEN
'电视剧' ELSE t.TYPE
END AS type,
t.CATEGORY AS category,
t.TITLE AS title,
t.YEAR AS year,
t.TMDBID AS tmdbid,
CASE
t.SEASON_EPISODE
WHEN NULL THEN
NULL ELSE substr( t.SEASON_EPISODE, 1, instr ( t.SEASON_EPISODE, ' ' ) - 1 )
END AS seasons,
CASE
t.SEASON_EPISODE
WHEN NULL THEN
NULL ELSE substr( t.SEASON_EPISODE, instr ( t.SEASON_EPISODE, ' ' ) + 1 )
END AS episodes,
d.POSTER AS image,
d.DOWNLOAD_ID AS download_hash,
t.DATE AS date
FROM
TRANSFER_HISTORY t
LEFT JOIN ( SELECT * FROM DOWNLOAD_HISTORY GROUP BY TMDBID ) d ON t.TITLE = d.TITLE
AND t.TYPE = d.TYPE;'''
cursor.execute(sql)
nt_historys = cursor.fetchall()
cursor.close()
if not nt_historys:
logger.error("未获取到NAStool数据库文件中的转移历史请检查数据库路径是正确")
return
logger.info(f"获取到NAStool转移记录 {len(nt_historys)}")
return nt_historys
def get_state(self) -> bool:
return True if self._nt_db_path else False
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1页面配置2数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'clear',
'label': '清空记录',
'placeholder': '开启会清空MoviePilot历史记录'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'nt_db_path',
'label': 'NAStool数据库user.db路径',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'path',
'label': '路径映射',
'placeholder': 'NAStool路径:MoviePilot路径一行一个'
}
}
]
}
]
},
]
}
], {
"clear": False,
"nt_db_path": "",
"path": "",
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
pass

View File

@ -36,6 +36,8 @@ class EventType(Enum):
MediaDeleted = "media.deleted"
# 用户外来消息
UserMessage = "user.message"
# 通知消息
NoticeMessage = "notice.message"
# 系统配置Key字典