From bbddec763a59acbf70f74dc923e5282d99d68226 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=AE=E5=8F=AE=E5=BD=93?= <604054726@qq.com> Date: Fri, 23 Feb 2024 18:13:27 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=8F=92=E4=BB=B6=E7=9B=B4=E6=8E=A5?= =?UTF-8?q?=E9=87=87=E7=94=A8=E7=A8=8B=E5=BA=8F=E7=9A=84=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=A8=A1=E5=9D=97,=20=E5=8F=AF=E6=98=BE?= =?UTF-8?q?=E7=A4=BA=E5=9C=A8=E5=89=8D=E7=AB=AF=E9=A1=B5=E9=9D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/plugin.py | 9 ++ app/core/plugin.py | 6 ++ app/scheduler.py | 167 ++++++++++++++++++++++++------------ 3 files changed, 129 insertions(+), 53 deletions(-) diff --git a/app/api/endpoints/plugin.py b/app/api/endpoints/plugin.py index c78f47c7..c3b020b3 100644 --- a/app/api/endpoints/plugin.py +++ b/app/api/endpoints/plugin.py @@ -8,6 +8,7 @@ from app.core.security import verify_token from app.db.systemconfig_oper import SystemConfigOper from app.helper.plugin import PluginHelper from app.schemas.types import SystemConfigKey +from app.scheduler import Scheduler router = APIRouter() @@ -90,6 +91,8 @@ def install(plugin_id: str, SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins) # 重载插件管理器 PluginManager().init_config() + # 注册插件服务 + Scheduler().update_plugin_job(plugin_id) return schemas.Response(success=True) @@ -123,6 +126,8 @@ def reset_plugin(plugin_id: str, _: schemas.TokenPayload = Depends(verify_token) PluginManager().delete_plugin_config(plugin_id) # 重新生效插件 PluginManager().reload_plugin(plugin_id, {}) + # 注册插件服务 + Scheduler().update_plugin_job(plugin_id) return schemas.Response(success=True) @@ -144,6 +149,8 @@ def set_plugin_config(plugin_id: str, conf: dict, PluginManager().save_plugin_config(plugin_id, conf) # 重新生效插件 PluginManager().reload_plugin(plugin_id, conf) + # 注册插件服务 + Scheduler().update_plugin_job(plugin_id) return schemas.Response(success=True) @@ -163,6 +170,8 @@ def uninstall_plugin(plugin_id: str, SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins) # 重载插件管理器 PluginManager().init_config() + # 移除插件服务 + Scheduler().remove_plugin_job(plugin_id) return schemas.Response(success=True) diff --git a/app/core/plugin.py b/app/core/plugin.py index af72bc4e..ad04d549 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -262,6 +262,12 @@ class PluginManager(metaclass=Singleton): """ return list(self._plugins.keys()) + def get_running_plugin_ids(self) -> List[str]: + """ + 获取所有运行态插件ID + """ + return list(self._running_plugins.keys()) + def get_online_plugins(self) -> List[dict]: """ 获取所有在线插件信息 diff --git a/app/scheduler.py b/app/scheduler.py index 1bb7852d..8dce1816 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -43,6 +43,8 @@ class Scheduler(metaclass=Singleton): }) # 退出事件 _event = threading.Event() + # 锁 + _lock = threading.Lock() def __init__(self): @@ -232,25 +234,8 @@ class Scheduler(metaclass=Singleton): ) # 注册插件公共服务 - plugin_services = PluginManager().get_plugin_services() - for service in plugin_services: - try: - self._jobs[service["id"]] = { - "func": service["func"], - "running": False, - } - self._scheduler.add_job( - self.start, - service["trigger"], - id=service["id"], - name=service["name"], - **service["kwargs"], - kwargs={ - 'job_id': service["id"] - } - ) - except Exception as e: - logger.error(f"注册插件服务失败:{str(e)} - {service}") + for pid in PluginManager().get_running_plugin_ids(): + self.update_plugin_job(pid) # 打印服务 logger.debug(self._scheduler.print_jobs()) @@ -263,51 +248,127 @@ class Scheduler(metaclass=Singleton): 启动定时服务 """ # 处理job_id格式 - job = self._jobs.get(job_id) - if not job: - return - if job.get("running"): - logger.warning(f"定时任务 {job_id} 正在运行 ...") - return - self._jobs[job_id]["running"] = True + with self._lock: + job = self._jobs.get(job_id) + if not job: + return + if job.get("running"): + logger.warning(f"定时任务 {job_id} 正在运行 ...") + return + self._jobs[job_id]["running"] = True try: if not kwargs: kwargs = job.get("kwargs") or {} job["func"](*args, **kwargs) except Exception as e: logger.error(f"定时任务 {job_id} 执行失败:{str(e)}") - self._jobs[job_id]["running"] = False + # 如果在job["func"]()运行时, 编辑配置导致任务被移除时, 该id已经不存在, 忽略错误 + with self._lock: + try: + self._jobs[job_id]["running"] = False + except: + pass + # 如果是单次任务, 应立即移除缓存 + if not self._scheduler.get_job(job_id): + self._jobs.pop(job_id, None) + + def update_plugin_job(self, pid: str): + """ + 更新插件定时服务 + """ + # 移除该插件的全部服务 + self.remove_plugin_job(pid) + # 获取插件服务列表 + with self._lock: + try: + plugin_services = PluginManager().run_plugin_method(pid, "get_service") or [] + except: + return + # 开始注册插件服务 + for service in plugin_services: + try: + sid = f"{pid}_{service['id']}" + self._jobs[sid] = { + "func": service["func"], + "name": service["name"], + "pid": pid, + "running": False, + } + self._scheduler.add_job( + self.start, + service["trigger"], + id=sid, + name=service["name"], + **service["kwargs"], + kwargs={ + 'job_id': sid + } + ) + logger.info(f"注册插件服务({pid}):{service['name']}") + except Exception as e: + logger.error(f"注册插件服务失败:{str(e)} - {service}") + + def remove_plugin_job(self, pid: str): + """ + 移除插件定时服务 + """ + with self._lock: + for job_id, service in self._jobs.copy().items(): + try: + if service.get("pid") == pid: + self._jobs.pop(job_id, None) + try: + self._scheduler.remove_job(job_id) + except: + pass + logger.info(f"移除插件服务({pid}):{service.get('name')}") + except Exception as e: + logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}") def list(self) -> List[schemas.ScheduleInfo]: """ 当前所有任务 """ - # 返回计时任务 - schedulers = [] - # 去重 - added = [] - jobs = self._scheduler.get_jobs() - # 按照下次运行时间排序 - jobs.sort(key=lambda x: x.next_run_time) - for job in jobs: - if job.name not in added: - added.append(job.name) - else: - continue - job_id = job.id.split("|")[0] - if not self._jobs.get(job_id): - continue - # 任务状态 - status = "正在运行" if self._jobs[job_id].get("running") else "等待" - # 下次运行时间 - next_run = TimerUtils.time_difference(job.next_run_time) - schedulers.append(schemas.ScheduleInfo( - id=job_id, - name=job.name, - status=status, - next_run=next_run - )) - return schedulers + with self._lock: + # 返回计时任务 + schedulers = [] + # 去重 + added = [] + jobs = self._scheduler.get_jobs() + # 按照下次运行时间排序 + jobs.sort(key=lambda x: x.next_run_time) + # 将正在运行的任务提取出来 (保障一次性任务正常显示) + for job_id, value in self._jobs.items(): + name = value.get("name") + if value.get("running") and name: + if name not in added: + added.append(name) + else: + continue + schedulers.append(schemas.ScheduleInfo( + id=job_id, + name=name, + status="正在运行", + next_run="~" + )) + # 获取其他待执行任务 + for job in jobs: + if job.name not in added: + added.append(job.name) + else: + continue + job_id = job.id.split("|")[0] + if not self._jobs.get(job_id): + continue + # 下次运行时间 + next_run = TimerUtils.time_difference(job.next_run_time) + schedulers.append(schemas.ScheduleInfo( + id=job_id, + name=job.name, + status="等待", + next_run=next_run + )) + return schedulers def stop(self): """ From f23cab861a067458ce7569ad9654953b1f0fdf80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=AE=E5=8F=AE=E5=BD=93?= <604054726@qq.com> Date: Fri, 23 Feb 2024 21:39:58 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix=20=E8=8E=B7=E5=8F=96=E5=85=B6=E4=BB=96?= =?UTF-8?q?=E5=BE=85=E6=89=A7=E8=A1=8C=E4=BB=BB=E5=8A=A1>status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/scheduler.py b/app/scheduler.py index 8dce1816..d279b4f9 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -360,12 +360,14 @@ class Scheduler(metaclass=Singleton): job_id = job.id.split("|")[0] if not self._jobs.get(job_id): continue + # 任务状态 + status = "正在运行" if self._jobs[job_id].get("running") else "等待" # 下次运行时间 next_run = TimerUtils.time_difference(job.next_run_time) schedulers.append(schemas.ScheduleInfo( id=job_id, name=job.name, - status="等待", + status=status, next_run=next_run )) return schedulers From 185c78b05c94cce57df6de5479b94d90ebebcb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=AE=E5=8F=AE=E5=BD=93?= <604054726@qq.com> Date: Fri, 23 Feb 2024 22:25:41 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BD=9C=E4=B8=9A?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8F=90=E4=BE=9B=E8=80=85=E6=9D=A1=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/plugin.py | 10 ++++++++++ app/scheduler.py | 27 ++++++++++++++++----------- app/schemas/dashboard.py | 2 ++ 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/app/core/plugin.py b/app/core/plugin.py index ad04d549..8e4aa2d3 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -246,6 +246,16 @@ class PluginManager(metaclass=Singleton): ret_services.extend(services) return ret_services + def get_plugin_attr(self, pid: str, attr: str) -> Any: + """ + 获取插件属性 + """ + if not self._running_plugins.get(pid): + return None + if not hasattr(self._running_plugins[pid], attr): + return None + return getattr(self._running_plugins[pid], attr) + def run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any: """ 运行插件方法 diff --git a/app/scheduler.py b/app/scheduler.py index d279b4f9..2e27d0fc 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -284,6 +284,8 @@ class Scheduler(metaclass=Singleton): plugin_services = PluginManager().run_plugin_method(pid, "get_service") or [] except: return + # 获取插件名称 + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") # 开始注册插件服务 for service in plugin_services: try: @@ -292,6 +294,7 @@ class Scheduler(metaclass=Singleton): "func": service["func"], "name": service["name"], "pid": pid, + "plugin_name": plugin_name, "running": False, } self._scheduler.add_job( @@ -304,7 +307,7 @@ class Scheduler(metaclass=Singleton): 'job_id': sid } ) - logger.info(f"注册插件服务({pid}):{service['name']}") + logger.info(f"注册插件服务({plugin_name}):{service['name']}") except Exception as e: logger.error(f"注册插件服务失败:{str(e)} - {service}") @@ -313,6 +316,8 @@ class Scheduler(metaclass=Singleton): 移除插件定时服务 """ with self._lock: + # 获取插件名称 + plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name") for job_id, service in self._jobs.copy().items(): try: if service.get("pid") == pid: @@ -321,7 +326,7 @@ class Scheduler(metaclass=Singleton): self._scheduler.remove_job(job_id) except: pass - logger.info(f"移除插件服务({pid}):{service.get('name')}") + logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") except Exception as e: logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}") @@ -338,16 +343,14 @@ class Scheduler(metaclass=Singleton): # 按照下次运行时间排序 jobs.sort(key=lambda x: x.next_run_time) # 将正在运行的任务提取出来 (保障一次性任务正常显示) - for job_id, value in self._jobs.items(): - name = value.get("name") - if value.get("running") and name: - if name not in added: - added.append(name) - else: - continue + for job_id, service in self._jobs.items(): + name = service.get("name") + plugin_name = service.get("plugin_name") + if service.get("running") and name and plugin_name: schedulers.append(schemas.ScheduleInfo( id=job_id, name=name, + provider=plugin_name, status="正在运行", next_run="~" )) @@ -358,15 +361,17 @@ class Scheduler(metaclass=Singleton): else: continue job_id = job.id.split("|")[0] - if not self._jobs.get(job_id): + service = self._jobs.get(job_id) + if not service: continue # 任务状态 - status = "正在运行" if self._jobs[job_id].get("running") else "等待" + status = "正在运行" if service.get("running") else "等待" # 下次运行时间 next_run = TimerUtils.time_difference(job.next_run_time) schedulers.append(schemas.ScheduleInfo( id=job_id, name=job.name, + provider=service.get("plugin_name", "MoviePilot"), status=status, next_run=next_run )) diff --git a/app/schemas/dashboard.py b/app/schemas/dashboard.py index 02dbbd23..33e83883 100644 --- a/app/schemas/dashboard.py +++ b/app/schemas/dashboard.py @@ -56,6 +56,8 @@ class ScheduleInfo(BaseModel): id: Optional[str] = None # 名称 name: Optional[str] = None + # 提供者 + provider: Optional[str] = None # 状态 status: Optional[str] = None # 下次执行时间