fix 目录监控转移消息统一发送

This commit is contained in:
thsrite 2023-08-24 13:07:31 +08:00
parent 10ee8d33fa
commit dd9258dc42
2 changed files with 279 additions and 149 deletions

View File

@ -3,9 +3,13 @@ import shutil
import threading
import time
import traceback
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Dict, Any
from threading import Event
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
@ -21,7 +25,7 @@ from app.modules.qbittorrent import Qbittorrent
from app.modules.transmission import Transmission
from app.plugins import _PluginBase
from app.schemas import Notification, NotificationType, TransferInfo
from app.schemas.types import EventType
from app.schemas.types import EventType, MediaType
from app.utils.system import SystemUtils
lock = threading.Lock()
@ -72,6 +76,7 @@ class DirMonitor(_PluginBase):
_synced_files = []
# 私有属性
_scheduler = None
transferhis = None
downloadhis = None
transferchian = None
@ -88,6 +93,9 @@ class DirMonitor(_PluginBase):
_dirconf: Dict[str, Path] = {}
qb = None
tr = None
_medias = {}
# 退出事件
_event = Event()
def init_plugin(self, config: dict = None):
self.transferhis = TransferHistoryOper(self.db)
@ -112,6 +120,7 @@ class DirMonitor(_PluginBase):
if self._enabled:
self.qb = Qbittorrent()
self.tr = Transmission()
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 启动任务
monitor_dirs = self._monitor_dirs.split("\n")
@ -165,6 +174,12 @@ class DirMonitor(_PluginBase):
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}")
# 追加入库消息统一发送服务
self._scheduler.add_job(self.send_msg, trigger='interval', seconds=5)
# 启动服务
self._scheduler.print_jobs()
self._scheduler.start()
def event_handler(self, event, mon_path: str, text: str, event_path: str):
"""
处理文件变化
@ -292,12 +307,68 @@ class DirMonitor(_PluginBase):
# 刮削元数据
self.chain.scrape_metadata(path=transferinfo.target_path, mediainfo=mediainfo)
"""
{
"title_year season": {
"files": [
{
"path":,
"mediainfo":,
"file_meta":,
"transferinfo":
}
],
"time": "2023-08-24 23:23:23.332"
}
}
"""
# 发送消息汇总
media_list = self._medias.get(mediainfo.title_year + " " + meta.season) or {}
if media_list:
media_files = media_list.get("files") or []
if media_files:
file_exists = False
for file in media_files:
if str(event_path) == file.get("path"):
file_exists = True
break
if not file_exists:
media_files.append({
"path": event_path,
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
})
else:
media_files = [
{
"path": event_path,
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
]
media_list = {
"files": media_files,
"time": datetime.now()
}
else:
media_list = {
"files": [
{
"path": event_path,
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
],
"time": datetime.now()
}
self._medias[mediainfo.title_year + " " + meta.season] = media_list
# 刷新媒体库
self.chain.refresh_mediaserver(mediainfo=mediainfo, file_path=transferinfo.target_path)
# 发送通知
if self._notify:
self.transferchian.send_transfer_message(meta=file_meta, mediainfo=mediainfo,
transferinfo=transferinfo)
# 广播事件
self.eventmanager.send_event(EventType.TransferComplete, {
'meta': file_meta,
@ -319,6 +390,58 @@ class DirMonitor(_PluginBase):
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
def send_msg(self):
"""
定时检查是否有媒体处理完发送统一消息
"""
if not self._medias or not self._medias.keys():
return
# 遍历检查是否已刮削完,发送消息
for medis_title_year_season in list(self._medias.keys()):
media_list = self._medias.get(medis_title_year_season)
logger.info(f"开始处理媒体 {medis_title_year_season} 消息")
if not media_list:
continue
# 获取最后更新时间
last_update_time = media_list.get("time")
media_files = media_list.get("files")
if not last_update_time or not media_files:
continue
transferinfo = media_files[0].get("transferinfo")
file_meta = media_files[0].get("file_meta")
mediainfo = media_files[0].get("mediainfo")
# 判断最后更新时间距现在是已超过3秒超过则发送消息
if (datetime.now() - last_update_time).total_seconds() > 3:
# 发送通知
if self._notify:
# 汇总处理文件总大小
total_size = 0
file_count = 0
for file in media_files:
transferinfo = file.get("transferinfo")
total_size += transferinfo.total_size
file_count += 1
transferinfo.total_size = total_size
# 汇总处理文件数量
transferinfo.file_count = file_count
# 处理文件多,说明是剧集,显示季入库消息
if mediainfo.type == MediaType.TV and file_count > 1:
file_meta.begin_episode = file_meta.begin_episode
file_meta.end_episode = media_files[-1].get("file_meta").end_episode
self.transferchian.send_transfer_message(meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo)
# 发送完消息移出key
del self._medias[medis_title_year_season]
continue
def get_download_hash(self, src: Path, tmdb_id: int):
"""
获取download_hash
@ -551,3 +674,10 @@ class DirMonitor(_PluginBase):
except Exception as e:
print(str(e))
self._observer = []
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._event.set()
self._scheduler.shutdown()
self._event.clear()
self._scheduler = None