fix #607 目录监控全量同步
This commit is contained in:
parent
c94866631b
commit
70f533684f
@ -1,12 +1,12 @@
|
|||||||
|
import datetime
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from threading import Event
|
|
||||||
from typing import List, Tuple, Dict, Any
|
from typing import List, Tuple, Dict, Any
|
||||||
|
|
||||||
|
import pytz
|
||||||
from apscheduler.schedulers.background import BackgroundScheduler
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
from watchdog.events import FileSystemEventHandler
|
from watchdog.events import FileSystemEventHandler
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
@ -16,6 +16,7 @@ from app.chain.tmdb import TmdbChain
|
|||||||
from app.chain.transfer import TransferChain
|
from app.chain.transfer import TransferChain
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.core.context import MediaInfo
|
from app.core.context import MediaInfo
|
||||||
|
from app.core.event import eventmanager, Event
|
||||||
from app.core.metainfo import MetaInfoPath
|
from app.core.metainfo import MetaInfoPath
|
||||||
from app.db.downloadhistory_oper import DownloadHistoryOper
|
from app.db.downloadhistory_oper import DownloadHistoryOper
|
||||||
from app.db.transferhistory_oper import TransferHistoryOper
|
from app.db.transferhistory_oper import TransferHistoryOper
|
||||||
@ -79,6 +80,7 @@ class DirMonitor(_PluginBase):
|
|||||||
_observer = []
|
_observer = []
|
||||||
_enabled = False
|
_enabled = False
|
||||||
_notify = False
|
_notify = False
|
||||||
|
_onlyonce = False
|
||||||
# 模式 compatibility/fast
|
# 模式 compatibility/fast
|
||||||
_mode = "fast"
|
_mode = "fast"
|
||||||
# 转移方式
|
# 转移方式
|
||||||
@ -91,7 +93,7 @@ class DirMonitor(_PluginBase):
|
|||||||
_transferconf: Dict[str, str] = {}
|
_transferconf: Dict[str, str] = {}
|
||||||
_medias = {}
|
_medias = {}
|
||||||
# 退出事件
|
# 退出事件
|
||||||
_event = Event()
|
_event = threading.Event()
|
||||||
|
|
||||||
def init_plugin(self, config: dict = None):
|
def init_plugin(self, config: dict = None):
|
||||||
self.transferhis = TransferHistoryOper(self.db)
|
self.transferhis = TransferHistoryOper(self.db)
|
||||||
@ -106,6 +108,7 @@ class DirMonitor(_PluginBase):
|
|||||||
if config:
|
if config:
|
||||||
self._enabled = config.get("enabled")
|
self._enabled = config.get("enabled")
|
||||||
self._notify = config.get("notify")
|
self._notify = config.get("notify")
|
||||||
|
self._onlyonce = config.get("onlyonce")
|
||||||
self._mode = config.get("mode")
|
self._mode = config.get("mode")
|
||||||
self._transfer_type = config.get("transfer_type")
|
self._transfer_type = config.get("transfer_type")
|
||||||
self._monitor_dirs = config.get("monitor_dirs") or ""
|
self._monitor_dirs = config.get("monitor_dirs") or ""
|
||||||
@ -114,10 +117,13 @@ class DirMonitor(_PluginBase):
|
|||||||
# 停止现有任务
|
# 停止现有任务
|
||||||
self.stop_service()
|
self.stop_service()
|
||||||
|
|
||||||
if self._enabled:
|
if self._enabled or self._onlyonce:
|
||||||
|
# 定时服务管理器
|
||||||
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
|
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
|
||||||
|
# 追加入库消息统一发送服务
|
||||||
|
self._scheduler.add_job(self.send_msg, trigger='interval', seconds=15)
|
||||||
|
|
||||||
# 启动任务
|
# 读取目录配置
|
||||||
monitor_dirs = self._monitor_dirs.split("\n")
|
monitor_dirs = self._monitor_dirs.split("\n")
|
||||||
if not monitor_dirs:
|
if not monitor_dirs:
|
||||||
return
|
return
|
||||||
@ -152,6 +158,8 @@ class DirMonitor(_PluginBase):
|
|||||||
# 转移方式
|
# 转移方式
|
||||||
self._transferconf[mon_path] = _transfer_type
|
self._transferconf[mon_path] = _transfer_type
|
||||||
|
|
||||||
|
# 启用目录监控
|
||||||
|
if self._enabled:
|
||||||
# 检查媒体库目录是不是下载目录的子目录
|
# 检查媒体库目录是不是下载目录的子目录
|
||||||
try:
|
try:
|
||||||
if target_path and target_path.is_relative_to(Path(mon_path)):
|
if target_path and target_path.is_relative_to(Path(mon_path)):
|
||||||
@ -188,12 +196,63 @@ class DirMonitor(_PluginBase):
|
|||||||
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
|
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
|
||||||
self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}")
|
self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}")
|
||||||
|
|
||||||
# 追加入库消息统一发送服务
|
# 运行一次定时服务
|
||||||
self._scheduler.add_job(self.send_msg, trigger='interval', seconds=15)
|
if self._onlyonce:
|
||||||
# 启动服务
|
logger.info("目录监控服务启动,立即运行一次")
|
||||||
|
self._scheduler.add_job(func=self.sync_all, trigger='date',
|
||||||
|
run_date=datetime.datetime.now(
|
||||||
|
tz=pytz.timezone(settings.TZ)) + datetime.timedelta(seconds=3)
|
||||||
|
)
|
||||||
|
# 关闭一次性开关
|
||||||
|
self._onlyonce = False
|
||||||
|
# 保存配置
|
||||||
|
self.__update_config()
|
||||||
|
|
||||||
|
# 启动定时服务
|
||||||
|
if self._scheduler.get_jobs():
|
||||||
self._scheduler.print_jobs()
|
self._scheduler.print_jobs()
|
||||||
self._scheduler.start()
|
self._scheduler.start()
|
||||||
|
|
||||||
|
def __update_config(self):
|
||||||
|
"""
|
||||||
|
更新配置
|
||||||
|
"""
|
||||||
|
self.update_config({
|
||||||
|
"enabled": self._enabled,
|
||||||
|
"notify": self._notify,
|
||||||
|
"onlyonce": self._onlyonce,
|
||||||
|
"mode": self._mode,
|
||||||
|
"transfer_type": self._transfer_type,
|
||||||
|
"monitor_dirs": self._monitor_dirs,
|
||||||
|
"exclude_keywords": self._exclude_keywords
|
||||||
|
})
|
||||||
|
|
||||||
|
@eventmanager.register(EventType.DirectorySync)
|
||||||
|
def remote_sync(self, event: Event):
|
||||||
|
"""
|
||||||
|
远程全量同步
|
||||||
|
"""
|
||||||
|
if event:
|
||||||
|
self.post_message(channel=event.event_data.get("channel"),
|
||||||
|
title="开始同步监控目录 ...",
|
||||||
|
userid=event.event_data.get("user"))
|
||||||
|
self.sync_all()
|
||||||
|
if event:
|
||||||
|
self.post_message(channel=event.event_data.get("channel"),
|
||||||
|
title="监控目录同步完成!", userid=event.event_data.get("user"))
|
||||||
|
|
||||||
|
def sync_all(self):
|
||||||
|
"""
|
||||||
|
立即运行一次,全量同步目录中所有文件
|
||||||
|
"""
|
||||||
|
logger.info("开始全量同步监控目录 ...")
|
||||||
|
# 遍历所有监控目录
|
||||||
|
for mon_path in self._dirconf.keys():
|
||||||
|
# 遍历目录下所有文件
|
||||||
|
for file_path in SystemUtils.list_files(Path(mon_path), settings.RMT_MEDIAEXT):
|
||||||
|
self.__handle_file(event_path=str(file_path), mon_path=mon_path)
|
||||||
|
logger.info("全量同步监控目录完成!")
|
||||||
|
|
||||||
def event_handler(self, event, mon_path: str, text: str, event_path: str):
|
def event_handler(self, event, mon_path: str, text: str, event_path: str):
|
||||||
"""
|
"""
|
||||||
处理文件变化
|
处理文件变化
|
||||||
@ -204,13 +263,19 @@ class DirMonitor(_PluginBase):
|
|||||||
"""
|
"""
|
||||||
if not event.is_directory:
|
if not event.is_directory:
|
||||||
# 文件发生变化
|
# 文件发生变化
|
||||||
|
logger.debug("文件%s:%s" % (text, event_path))
|
||||||
|
self.__handle_file(event_path=event_path, mon_path=mon_path)
|
||||||
|
|
||||||
|
def __handle_file(self, event_path: str, mon_path: str):
|
||||||
|
"""
|
||||||
|
同步一个文件
|
||||||
|
:param event_path: 事件文件路径
|
||||||
|
:param mon_path: 监控目录
|
||||||
|
"""
|
||||||
file_path = Path(event_path)
|
file_path = Path(event_path)
|
||||||
try:
|
try:
|
||||||
if not file_path.exists():
|
if not file_path.exists():
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("文件%s:%s" % (text, event_path))
|
|
||||||
|
|
||||||
# 全程加锁
|
# 全程加锁
|
||||||
with lock:
|
with lock:
|
||||||
transfer_history = self.transferhis.get_by_src(event_path)
|
transfer_history = self.transferhis.get_by_src(event_path)
|
||||||
@ -402,7 +467,7 @@ class DirMonitor(_PluginBase):
|
|||||||
]
|
]
|
||||||
media_list = {
|
media_list = {
|
||||||
"files": media_files,
|
"files": media_files,
|
||||||
"time": datetime.now()
|
"time": datetime.datetime.now()
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
media_list = {
|
media_list = {
|
||||||
@ -414,7 +479,7 @@ class DirMonitor(_PluginBase):
|
|||||||
"transferinfo": transferinfo
|
"transferinfo": transferinfo
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"time": datetime.now()
|
"time": datetime.datetime.now()
|
||||||
}
|
}
|
||||||
self._medias[mediainfo.title_year + " " + file_meta.season] = media_list
|
self._medias[mediainfo.title_year + " " + file_meta.season] = media_list
|
||||||
|
|
||||||
@ -467,7 +532,7 @@ class DirMonitor(_PluginBase):
|
|||||||
file_meta = media_files[0].get("file_meta")
|
file_meta = media_files[0].get("file_meta")
|
||||||
mediainfo = media_files[0].get("mediainfo")
|
mediainfo = media_files[0].get("mediainfo")
|
||||||
# 判断最后更新时间距现在是已超过5秒,超过则发送消息
|
# 判断最后更新时间距现在是已超过5秒,超过则发送消息
|
||||||
if (datetime.now() - last_update_time).total_seconds() > 5:
|
if (datetime.datetime.now() - last_update_time).total_seconds() > 5:
|
||||||
# 发送通知
|
# 发送通知
|
||||||
if self._notify:
|
if self._notify:
|
||||||
|
|
||||||
@ -519,7 +584,17 @@ class DirMonitor(_PluginBase):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_command() -> List[Dict[str, Any]]:
|
def get_command() -> List[Dict[str, Any]]:
|
||||||
pass
|
"""
|
||||||
|
定义远程控制命令
|
||||||
|
:return: 命令关键字、事件、描述、附带数据
|
||||||
|
"""
|
||||||
|
return [{
|
||||||
|
"cmd": "/directory_sync",
|
||||||
|
"event": EventType.DirectorySync,
|
||||||
|
"desc": "目录监控同步",
|
||||||
|
"category": "管理",
|
||||||
|
"data": {}
|
||||||
|
}]
|
||||||
|
|
||||||
def get_api(self) -> List[Dict[str, Any]]:
|
def get_api(self) -> List[Dict[str, Any]]:
|
||||||
pass
|
pass
|
||||||
@ -536,7 +611,7 @@ class DirMonitor(_PluginBase):
|
|||||||
'component': 'VCol',
|
'component': 'VCol',
|
||||||
'props': {
|
'props': {
|
||||||
'cols': 12,
|
'cols': 12,
|
||||||
'md': 6
|
'md': 4
|
||||||
},
|
},
|
||||||
'content': [
|
'content': [
|
||||||
{
|
{
|
||||||
@ -552,7 +627,7 @@ class DirMonitor(_PluginBase):
|
|||||||
'component': 'VCol',
|
'component': 'VCol',
|
||||||
'props': {
|
'props': {
|
||||||
'cols': 12,
|
'cols': 12,
|
||||||
'md': 6
|
'md': 4
|
||||||
},
|
},
|
||||||
'content': [
|
'content': [
|
||||||
{
|
{
|
||||||
@ -563,6 +638,22 @@ class DirMonitor(_PluginBase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'component': 'VCol',
|
||||||
|
'props': {
|
||||||
|
'cols': 12,
|
||||||
|
'md': 4
|
||||||
|
},
|
||||||
|
'content': [
|
||||||
|
{
|
||||||
|
'component': 'VSwitch',
|
||||||
|
'props': {
|
||||||
|
'model': 'onlyonce',
|
||||||
|
'label': '立即运行一次',
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
@ -628,11 +719,11 @@ class DirMonitor(_PluginBase):
|
|||||||
'model': 'monitor_dirs',
|
'model': 'monitor_dirs',
|
||||||
'label': '监控目录',
|
'label': '监控目录',
|
||||||
'rows': 5,
|
'rows': 5,
|
||||||
'placeholder': '每一行一个目录,支持三种配置方式:\n'
|
'placeholder': '每一行一个目录,支持三种配置方式,转移方式支持 move、copy、link、softlink、rclone_copy、rclone_move:\n'
|
||||||
'监控目录\n'
|
'监控目录\n'
|
||||||
'监控目录#转移方式(move|copy|link|softlink|rclone_copy|rclone_move)\n'
|
'监控目录#转移方式\n'
|
||||||
'监控目录:转移目的目录(需同时在媒体库目录中配置该目的目录)\n'
|
'监控目录:转移目的目录\n'
|
||||||
'监控目录:转移目的目录#转移方式(move|copy|link|softlink|rclone_copy|rclone_move)'
|
'监控目录:转移目的目录#转移方式'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -666,6 +757,7 @@ class DirMonitor(_PluginBase):
|
|||||||
], {
|
], {
|
||||||
"enabled": False,
|
"enabled": False,
|
||||||
"notify": False,
|
"notify": False,
|
||||||
|
"onlyonce": False,
|
||||||
"mode": "fast",
|
"mode": "fast",
|
||||||
"transfer_type": settings.TRANSFER_TYPE,
|
"transfer_type": settings.TRANSFER_TYPE,
|
||||||
"monitor_dirs": "",
|
"monitor_dirs": "",
|
||||||
|
@ -44,6 +44,8 @@ class EventType(Enum):
|
|||||||
NameRecognize = "name.recognize"
|
NameRecognize = "name.recognize"
|
||||||
# 名称识别结果
|
# 名称识别结果
|
||||||
NameRecognizeResult = "name.recognize.result"
|
NameRecognizeResult = "name.recognize.result"
|
||||||
|
# 目录监控同步
|
||||||
|
DirectorySync = "directory.sync"
|
||||||
|
|
||||||
|
|
||||||
# 系统配置Key字典
|
# 系统配置Key字典
|
||||||
|
Loading…
x
Reference in New Issue
Block a user