diff --git a/app/api/endpoints/plugin.py b/app/api/endpoints/plugin.py index c78f47c7..c3b020b3 100644 --- a/app/api/endpoints/plugin.py +++ b/app/api/endpoints/plugin.py @@ -8,6 +8,7 @@ from app.core.security import verify_token from app.db.systemconfig_oper import SystemConfigOper from app.helper.plugin import PluginHelper from app.schemas.types import SystemConfigKey +from app.scheduler import Scheduler router = APIRouter() @@ -90,6 +91,8 @@ def install(plugin_id: str, SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins) # 重载插件管理器 PluginManager().init_config() + # 注册插件服务 + Scheduler().update_plugin_job(plugin_id) return schemas.Response(success=True) @@ -123,6 +126,8 @@ def reset_plugin(plugin_id: str, _: schemas.TokenPayload = Depends(verify_token) PluginManager().delete_plugin_config(plugin_id) # 重新生效插件 PluginManager().reload_plugin(plugin_id, {}) + # 注册插件服务 + Scheduler().update_plugin_job(plugin_id) return schemas.Response(success=True) @@ -144,6 +149,8 @@ def set_plugin_config(plugin_id: str, conf: dict, PluginManager().save_plugin_config(plugin_id, conf) # 重新生效插件 PluginManager().reload_plugin(plugin_id, conf) + # 注册插件服务 + Scheduler().update_plugin_job(plugin_id) return schemas.Response(success=True) @@ -163,6 +170,8 @@ def uninstall_plugin(plugin_id: str, SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins) # 重载插件管理器 PluginManager().init_config() + # 移除插件服务 + Scheduler().remove_plugin_job(plugin_id) return schemas.Response(success=True) diff --git a/app/core/plugin.py b/app/core/plugin.py index af72bc4e..8e4aa2d3 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -246,6 +246,16 @@ class PluginManager(metaclass=Singleton): ret_services.extend(services) return ret_services + def get_plugin_attr(self, pid: str, attr: str) -> Any: + """ + 获取插件属性 + """ + if not self._running_plugins.get(pid): + return None + if not hasattr(self._running_plugins[pid], attr): + return None + return getattr(self._running_plugins[pid], attr) + def run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any: """ 运行插件方法 @@ -262,6 +272,12 @@ class PluginManager(metaclass=Singleton): """ return list(self._plugins.keys()) + def get_running_plugin_ids(self) -> List[str]: + """ + 获取所有运行态插件ID + """ + return list(self._running_plugins.keys()) + def get_online_plugins(self) -> List[dict]: """ 获取所有在线插件信息 diff --git a/app/scheduler.py b/app/scheduler.py index 1bb7852d..2e27d0fc 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -43,6 +43,8 @@ class Scheduler(metaclass=Singleton): }) # 退出事件 _event = threading.Event() + # 锁 + _lock = threading.Lock() def __init__(self): @@ -232,25 +234,8 @@ class Scheduler(metaclass=Singleton): ) # 注册插件公共服务 - plugin_services = PluginManager().get_plugin_services() - for service in plugin_services: - try: - self._jobs[service["id"]] = { - "func": service["func"], - "running": False, - } - self._scheduler.add_job( - self.start, - service["trigger"], - id=service["id"], - name=service["name"], - **service["kwargs"], - kwargs={ - 'job_id': service["id"] - } - ) - except Exception as e: - logger.error(f"注册插件服务失败:{str(e)} - {service}") + for pid in PluginManager().get_running_plugin_ids(): + self.update_plugin_job(pid) # 打印服务 logger.debug(self._scheduler.print_jobs()) @@ -263,51 +248,134 @@ class Scheduler(metaclass=Singleton): 启动定时服务 """ # 处理job_id格式 - job = self._jobs.get(job_id) - if not job: - return - if job.get("running"): - logger.warning(f"定时任务 {job_id} 正在运行 ...") - return - self._jobs[job_id]["running"] = True + with self._lock: + job = self._jobs.get(job_id) + if not job: + return + if job.get("running"): + logger.warning(f"定时任务 {job_id} 正在运行 ...") + return + self._jobs[job_id]["running"] = True try: if not kwargs: kwargs = job.get("kwargs") or {} job["func"](*args, **kwargs) except Exception as e: logger.error(f"定时任务 {job_id} 执行失败:{str(e)}") - self._jobs[job_id]["running"] = False + # 如果在job["func"]()运行时, 编辑配置导致任务被移除时, 该id已经不存在, 忽略错误 + with self._lock: + try: + self._jobs[job_id]["running"] = False + except: + pass + # 如果是单次任务, 应立即移除缓存 + if not self._scheduler.get_job(job_id): + self._jobs.pop(job_id, None) + + def update_plugin_job(self, pid: str): + """ + 更新插件定时服务 + """ + # 移除该插件的全部服务 + self.remove_plugin_job(pid) + # 获取插件服务列表 + with self._lock: + try: + plugin_services = PluginManager().run_plugin_method(pid, "get_service") or [] + except: + return + # 获取插件名称 + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") + # 开始注册插件服务 + for service in plugin_services: + try: + sid = f"{pid}_{service['id']}" + self._jobs[sid] = { + "func": service["func"], + "name": service["name"], + "pid": pid, + "plugin_name": plugin_name, + "running": False, + } + self._scheduler.add_job( + self.start, + service["trigger"], + id=sid, + name=service["name"], + **service["kwargs"], + kwargs={ + 'job_id': sid + } + ) + logger.info(f"注册插件服务({plugin_name}):{service['name']}") + except Exception as e: + logger.error(f"注册插件服务失败:{str(e)} - {service}") + + def remove_plugin_job(self, pid: str): + """ + 移除插件定时服务 + """ + with self._lock: + # 获取插件名称 + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") + for job_id, service in self._jobs.copy().items(): + try: + if service.get("pid") == pid: + self._jobs.pop(job_id, None) + try: + self._scheduler.remove_job(job_id) + except: + pass + logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") + except Exception as e: + logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}") def list(self) -> List[schemas.ScheduleInfo]: """ 当前所有任务 """ - # 返回计时任务 - schedulers = [] - # 去重 - added = [] - jobs = self._scheduler.get_jobs() - # 按照下次运行时间排序 - jobs.sort(key=lambda x: x.next_run_time) - for job in jobs: - if job.name not in added: - added.append(job.name) - else: - continue - job_id = job.id.split("|")[0] - if not self._jobs.get(job_id): - continue - # 任务状态 - status = "正在运行" if self._jobs[job_id].get("running") else "等待" - # 下次运行时间 - next_run = TimerUtils.time_difference(job.next_run_time) - schedulers.append(schemas.ScheduleInfo( - id=job_id, - name=job.name, - status=status, - next_run=next_run - )) - return schedulers + with self._lock: + # 返回计时任务 + schedulers = [] + # 去重 + added = [] + jobs = self._scheduler.get_jobs() + # 按照下次运行时间排序 + jobs.sort(key=lambda x: x.next_run_time) + # 将正在运行的任务提取出来 (保障一次性任务正常显示) + for job_id, service in self._jobs.items(): + name = service.get("name") + plugin_name = service.get("plugin_name") + if service.get("running") and name and plugin_name: + schedulers.append(schemas.ScheduleInfo( + id=job_id, + name=name, + provider=plugin_name, + status="正在运行", + next_run="~" + )) + # 获取其他待执行任务 + for job in jobs: + if job.name not in added: + added.append(job.name) + else: + continue + job_id = job.id.split("|")[0] + service = self._jobs.get(job_id) + if not service: + continue + # 任务状态 + status = "正在运行" if service.get("running") else "等待" + # 下次运行时间 + next_run = TimerUtils.time_difference(job.next_run_time) + schedulers.append(schemas.ScheduleInfo( + id=job_id, + name=job.name, + provider=service.get("plugin_name", "MoviePilot"), + status=status, + next_run=next_run + )) + return schedulers def stop(self): """ diff --git a/app/schemas/dashboard.py b/app/schemas/dashboard.py index 02dbbd23..33e83883 100644 --- a/app/schemas/dashboard.py +++ b/app/schemas/dashboard.py @@ -56,6 +56,8 @@ class ScheduleInfo(BaseModel): id: Optional[str] = None # 名称 name: Optional[str] = None + # 提供者 + provider: Optional[str] = None # 状态 status: Optional[str] = None # 下次执行时间