diff --git a/app/core/plugin.py b/app/core/plugin.py index 72f4990d..1140e1af 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -1,11 +1,11 @@ import concurrent import concurrent.futures import inspect -import os import threading import time import traceback -from typing import List, Any, Dict, Tuple, Optional, Callable +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -13,8 +13,8 @@ from watchdog.observers import Observer from app import schemas from app.core.config import settings from app.core.event import eventmanager -from app.db.systemconfig_oper import SystemConfigOper from app.db.plugindata_oper import PluginDataOper +from app.db.systemconfig_oper import SystemConfigOper from app.helper.module import ModuleHelper from app.helper.plugin import PluginHelper from app.helper.sites import SitesHelper @@ -42,21 +42,35 @@ class PluginMonitorHandler(FileSystemEventHandler): """ if event.is_directory: return + # 使用 pathlib 处理文件路径,跳过非 .py 文件以及 pycache 目录中的文件 + event_path = Path(event.src_path) + if not event_path.name.endswith(".py") or "pycache" in event_path.parts: + return + current_time = time.time() if current_time - self.__last_modified < self.__timeout: return self.__last_modified = current_time # 读取插件根目录下的__init__.py文件,读取class XXXX(_PluginBase)的类名 try: - # 使用os.path和pathlib处理跨平台的路径问题 - plugin_dir = event.src_path.split("plugins" + os.sep)[1].split(os.sep)[0] - init_file = settings.ROOT_PATH / "app" / "plugins" / plugin_dir / "__init__.py" + plugins_root = settings.ROOT_PATH / "app" / "plugins" + # 确保修改的文件在 plugins 目录下 + if plugins_root not in event_path.parents: + return + # 获取插件目录路径,没有找到__init__.py时,说明不是有效包,跳过插件重载 + # 插件重载目前没有支持app/plugins/plugin/package/__init__.py的场景,这里也不做支持 + plugin_dir = event_path.parent + init_file = plugin_dir / "__init__.py" + if not init_file.exists(): + logger.debug(f"{plugin_dir} 下没有找到 __init__.py,跳过插件重载") + return + with open(init_file, "r", encoding="utf-8") as f: lines = f.readlines() pid = None for line in lines: if line.startswith("class") and "(_PluginBase)" in line: - pid = line.split("class ")[1].split("(_PluginBase)")[0] + pid = line.split("class ")[1].split("(_PluginBase)")[0].strip() if pid: # 防抖处理,通过计时器延迟加载 if self.__reload_timer: