2023-10-26 14:34:58 +08:00

429 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
import time
from pathlib import Path
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple
from app.schemas.types import EventType, MediaImageType, NotificationType, MediaType
from app.utils.system import SystemUtils
class CloudDiskDel(_PluginBase):
# 插件名称
plugin_name = "云盘文件删除"
# 插件描述
plugin_desc = "媒体库删除strm文件后同步删除云盘资源。"
# 插件图标
plugin_icon = "clouddisk.png"
# 主题色
plugin_color = "#ff6600"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "clouddiskdel_"
# 加载顺序
plugin_order = 26
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
# 任务执行间隔
_paths = {}
_notify = False
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._notify = config.get("notify")
for path in str(config.get("path")).split("\n"):
paths = path.split(":")
self._paths[paths[0]] = paths[1]
@eventmanager.register(EventType.NetworkDiskDel)
def clouddisk_del(self, event: Event):
if not self._enabled:
return
event_data = event.event_data
logger.info(f"获取到云盘删除请求 {event_data}")
media_path = event_data.get("media_path")
if not media_path:
logger.error("未获取到删除路径")
return
media_name = event_data.get("media_name")
tmdb_id = event_data.get("tmdb_id")
media_type = event_data.get("media_type")
season_num = event_data.get("season_num")
episode_num = event_data.get("episode_num")
# 判断删除媒体路径是否与配置的媒体库路径相符,相符则继续删除,不符则跳过
for library_path in list(self._paths.keys()):
if str(media_path).startswith(library_path):
# 替换网盘路径
media_path = str(media_path).replace(library_path, self._paths.get(library_path))
logger.info(f"获取到moviepilot本地云盘挂载路径 {media_path}")
path = Path(media_path)
if path.is_file() or media_path.endswith(".strm"):
# 删除文件、nfo、jpg等同名文件
pattern = path.stem.replace('[', '?').replace(']', '?')
logger.info(f"开始筛选同名文件 {pattern}")
files = path.parent.glob(f"{pattern}.*")
for file in files:
Path(file).unlink()
logger.info(f"云盘文件 {file} 已删除")
else:
# 非根目录,才删除目录
shutil.rmtree(path)
# 删除目录
logger.warn(f"云盘目录 {path} 已删除")
# 判断当前媒体父路径下是否有媒体文件,如有则无需遍历父级
if not SystemUtils.exits_files(path.parent, settings.RMT_MEDIAEXT):
# 判断父目录是否为空, 为空则删除
for parent_path in path.parents:
if str(parent_path.parent) != str(path.root):
# 父目录非根目录,才删除父目录
if not SystemUtils.exits_files(parent_path, settings.RMT_MEDIAEXT):
# 当前路径下没有媒体文件则删除
shutil.rmtree(parent_path)
logger.warn(f"云盘目录 {parent_path} 已删除")
break
# 发送消息
image = 'https://emby.media/notificationicon.png'
media_type = MediaType.MOVIE if media_type in ["Movie", "MOV"] else MediaType.TV
if self._notify:
specific_image = self.chain.obtain_specific_image(
mediaid=tmdb_id,
mtype=media_type,
image_type=MediaImageType.Backdrop,
season=season_num,
episode=episode_num
)
if specific_image:
image = specific_image
# 类型
if media_type == MediaType.MOVIE:
msg = f'电影 {media_name} {tmdb_id}'
# 删除电视剧
elif media_type == MediaType.TV and not season_num and not episode_num:
msg = f'剧集 {media_name} {tmdb_id}'
# 删除季 S02
elif media_type == MediaType.TV and season_num and not episode_num:
msg = f'剧集 {media_name} S{season_num} {tmdb_id}'
# 删除剧集S02E02
elif media_type == MediaType.TV and season_num and episode_num:
msg = f'剧集 {media_name} S{season_num}E{episode_num} {tmdb_id}'
else:
msg = media_name
# 发送通知
self.post_message(
mtype=NotificationType.MediaServer,
title="云盘同步删除任务完成",
image=image,
text=f"{msg}\n"
f"时间 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}"
)
# 读取历史记录
history = self.get_data('history') or []
history.append({
"type": media_type.value,
"title": media_name,
"path": media_path,
"season": season_num,
"episode": episode_num,
"image": image,
"del_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
})
# 保存历史
self.save_data("history", history)
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '开启通知',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'path',
'rows': '2',
'label': '媒体库路径映射',
'placeholder': '媒体服务器路径:moviepilot内云盘挂载路径一行一个'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'text': '需要开启媒体库删除插件且正确配置排除路径。'
'主要针对于strm文件删除后同步删除云盘资源。'
'如遇删除失败,请检查文件权限问题。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'text': '关于路径映射:'
'emby:/data/series/A.mp4,'
'moviepilot内云盘挂载路径:/mnt/link/series/A.mp4。'
'路径映射填/data:/mnt/link'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"path": "",
"notify": False
}
def get_page(self) -> List[dict]:
"""
拼装插件详情页面,需要返回页面配置,同时附带数据
"""
# 查询同步详情
historys = self.get_data('history')
if not historys:
return [
{
'component': 'div',
'text': '暂无数据',
'props': {
'class': 'text-center',
}
}
]
# 数据按时间降序排序
historys = sorted(historys, key=lambda x: x.get('del_time'), reverse=True)
# 拼装页面
contents = []
for history in historys:
htype = history.get("type")
title = history.get("title")
season = history.get("season")
episode = history.get("episode")
image = history.get("image")
del_time = history.get("del_time")
if season:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'季:{season}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'集:{episode}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
else:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
contents.append(
{
'component': 'VCard',
'content': [
{
'component': 'div',
'props': {
'class': 'd-flex justify-space-start flex-nowrap flex-row',
},
'content': [
{
'component': 'div',
'content': [
{
'component': 'VImg',
'props': {
'src': image,
'height': 120,
'width': 80,
'aspect-ratio': '2/3',
'class': 'object-cover shadow ring-gray-500',
'cover': True
}
}
]
},
{
'component': 'div',
'content': sub_contents
}
]
}
]
}
)
return [
{
'component': 'div',
'props': {
'class': 'grid gap-3 grid-info-card',
},
'content': contents
}
]
def stop_service(self):
"""
退出插件
"""
pass